diff options
121 files changed, 1980 insertions, 1220 deletions
diff --git a/.buildkite/worker-blacklist b/.buildkite/worker-blacklist index 158ab79154..094b6c94da 100644 --- a/.buildkite/worker-blacklist +++ b/.buildkite/worker-blacklist @@ -39,3 +39,5 @@ Server correctly handles incoming m.device_list_update # this fails reliably with a torture level of 100 due to https://github.com/matrix-org/synapse/issues/6536 Outbound federation requests missing prev_events and then asks for /state_ids and resolves the state + +Can get rooms/{roomId}/members at a given point diff --git a/CHANGES.md b/CHANGES.md index f776560de7..37b650a848 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,11 +1,35 @@ -Synapse 1.10.0rc4 (2020-02-11) +Synapse 1.10.1 (2020-02-17) +=========================== + +Bugfixes +-------- + +- Fix a bug introduced in Synapse 1.10.0 which would cause room state to be cleared in the database if Synapse was upgraded direct from 1.2.1 or earlier to 1.10.0. ([\#6924](https://github.com/matrix-org/synapse/issues/6924)) + + +Synapse 1.10.0 (2020-02-12) +=========================== + +**WARNING to client developers**: As of this release Synapse validates `client_secret` parameters in the Client-Server API as per the spec. See [\#6766](https://github.com/matrix-org/synapse/issues/6766) for details. + +Updates to the Docker image +--------------------------- + +- Update the docker images to Alpine Linux 3.11. ([\#6897](https://github.com/matrix-org/synapse/issues/6897)) + + +Synapse 1.10.0rc5 (2020-02-11) ============================== -Features +Bugfixes -------- -- Filter out m.room.aliases from /sync state blocks until a full fix lands. ([\#6884](https://github.com/matrix-org/synapse/issues/6884)) +- Fix the filtering introduced in 1.10.0rc3 to also apply to the state blocks returned by `/sync`. ([\#6884](https://github.com/matrix-org/synapse/issues/6884)) +Synapse 1.10.0rc4 (2020-02-11) +============================== + +This release candidate was built incorrectly and is superceded by 1.10.0rc5. Synapse 1.10.0rc3 (2020-02-10) ============================== @@ -13,7 +37,7 @@ Synapse 1.10.0rc3 (2020-02-10) Features -------- -- Filter out m.room.aliases from the CS API to mitigate abuse while a better solution is specced. ([\#6878](https://github.com/matrix-org/synapse/issues/6878)) +- Filter out `m.room.aliases` from the CS API to mitigate abuse while a better solution is specced. ([\#6878](https://github.com/matrix-org/synapse/issues/6878)) Internal Changes @@ -41,9 +65,6 @@ Internal Changes Synapse 1.10.0rc1 (2020-01-31) ============================== -**WARNING to client developers**: As of this release Synapse validates `client_secret` parameters in the Client-Server API as per the spec. See [\#6766](https://github.com/matrix-org/synapse/issues/6766) for details. - - Features -------- diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 5736ede6c4..4b01b6ac8c 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -200,6 +200,20 @@ Git allows you to add this signoff automatically when using the `-s` flag to `git commit`, which uses the name and email set in your `user.name` and `user.email` git configs. +## Merge Strategy + +We use the commit history of develop/master extensively to identify +when regressions were introduced and what changes have been made. + +We aim to have a clean merge history, which means we normally squash-merge +changes into develop. For small changes this means there is no need to rebase +to clean up your PR before merging. Larger changes with an organised set of +commits may be merged as-is, if the history is judged to be useful. + +This use of squash-merging will mean PRs built on each other will be hard to +merge. We suggest avoiding these where possible, and if required, ensuring +each PR has a tidy set of commits to ease merging. + ## Conclusion That's it! Matrix is a very open and collaborative project as you might expect diff --git a/INSTALL.md b/INSTALL.md index d25fcf0753..9fe767704b 100644 --- a/INSTALL.md +++ b/INSTALL.md @@ -388,15 +388,17 @@ Once you have installed synapse as above, you will need to configure it. ## TLS certificates -The default configuration exposes a single HTTP port: http://localhost:8008. It -is suitable for local testing, but for any practical use, you will either need -to enable a reverse proxy, or configure Synapse to expose an HTTPS port. +The default configuration exposes a single HTTP port on the local +interface: `http://localhost:8008`. It is suitable for local testing, +but for any practical use, you will need Synapse's APIs to be served +over HTTPS. -For information on using a reverse proxy, see +The recommended way to do so is to set up a reverse proxy on port +`8448`. You can find documentation on doing so in [docs/reverse_proxy.md](docs/reverse_proxy.md). -To configure Synapse to expose an HTTPS port, you will need to edit -`homeserver.yaml`, as follows: +Alternatively, you can configure Synapse to expose an HTTPS port. To do +so, you will need to edit `homeserver.yaml`, as follows: * First, under the `listeners` section, uncomment the configuration for the TLS-enabled listener. (Remove the hash sign (`#`) at the start of @@ -414,11 +416,15 @@ To configure Synapse to expose an HTTPS port, you will need to edit point these settings at an existing certificate and key, or you can enable Synapse's built-in ACME (Let's Encrypt) support. Instructions for having Synapse automatically provision and renew federation - certificates through ACME can be found at [ACME.md](docs/ACME.md). If you - are using your own certificate, be sure to use a `.pem` file that includes - the full certificate chain including any intermediate certificates (for - instance, if using certbot, use `fullchain.pem` as your certificate, not - `cert.pem`). + certificates through ACME can be found at [ACME.md](docs/ACME.md). + Note that, as pointed out in that document, this feature will not + work with installs set up after November 2020. + + If you are using your + own certificate, be sure to use a `.pem` file that includes the full + certificate chain including any intermediate certificates (for + instance, if using certbot, use `fullchain.pem` as your certificate, + not `cert.pem`). For a more detailed guide to configuring your server for federation, see [federate.md](docs/federate.md) diff --git a/README.rst b/README.rst index 2691dfc23d..4db7d17e94 100644 --- a/README.rst +++ b/README.rst @@ -272,7 +272,7 @@ to install using pip and a virtualenv:: virtualenv -p python3 env source env/bin/activate - python -m pip install --no-use-pep517 -e .[all] + python -m pip install --no-use-pep517 -e ".[all]" This will run a process of downloading and installing all the needed dependencies into a virtual env. diff --git a/changelog.d/6769.feature b/changelog.d/6769.feature new file mode 100644 index 0000000000..8a60e12907 --- /dev/null +++ b/changelog.d/6769.feature @@ -0,0 +1 @@ +Admin API to add or modify threepids of user accounts. \ No newline at end of file diff --git a/changelog.d/6781.bugfix b/changelog.d/6781.bugfix new file mode 100644 index 0000000000..47cd671bff --- /dev/null +++ b/changelog.d/6781.bugfix @@ -0,0 +1 @@ +Fixed third party event rules function `on_create_room`'s return value being ignored. diff --git a/changelog.d/6821.misc b/changelog.d/6821.misc new file mode 100644 index 0000000000..1d5265d5e2 --- /dev/null +++ b/changelog.d/6821.misc @@ -0,0 +1 @@ +Add type hints to `SyncHandler`. diff --git a/changelog.d/6823.misc b/changelog.d/6823.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6823.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6825.bugfix b/changelog.d/6825.bugfix new file mode 100644 index 0000000000..d3cacd6d9a --- /dev/null +++ b/changelog.d/6825.bugfix @@ -0,0 +1 @@ +Allow URL-encoded User IDs on `/_synapse/admin/v2/users/<user_id>[/admin]` endpoints. Thanks to @NHAS for reporting. \ No newline at end of file diff --git a/changelog.d/6827.misc b/changelog.d/6827.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6827.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6833.misc b/changelog.d/6833.misc new file mode 100644 index 0000000000..8a0605f90b --- /dev/null +++ b/changelog.d/6833.misc @@ -0,0 +1 @@ +Reducing log level to DEBUG for synapse.storage.TIME. diff --git a/changelog.d/6834.misc b/changelog.d/6834.misc new file mode 100644 index 0000000000..79acebe516 --- /dev/null +++ b/changelog.d/6834.misc @@ -0,0 +1 @@ +Change the default power levels of invites, tombstones and server ACLs for new rooms. \ No newline at end of file diff --git a/changelog.d/6836.misc b/changelog.d/6836.misc new file mode 100644 index 0000000000..232488e1e5 --- /dev/null +++ b/changelog.d/6836.misc @@ -0,0 +1 @@ +Fix stacktraces when using `ObservableDeferred` and async/await. diff --git a/changelog.d/6837.misc b/changelog.d/6837.misc new file mode 100644 index 0000000000..0496f12de8 --- /dev/null +++ b/changelog.d/6837.misc @@ -0,0 +1 @@ +Port much of `synapse.handlers.federation` to async/await. diff --git a/changelog.d/6840.misc b/changelog.d/6840.misc new file mode 100644 index 0000000000..0496f12de8 --- /dev/null +++ b/changelog.d/6840.misc @@ -0,0 +1 @@ +Port much of `synapse.handlers.federation` to async/await. diff --git a/changelog.d/6844.bugfix b/changelog.d/6844.bugfix new file mode 100644 index 0000000000..e84aa1029f --- /dev/null +++ b/changelog.d/6844.bugfix @@ -0,0 +1 @@ +Fix an issue with cross-signing where device signatures were not sent to remote servers. diff --git a/changelog.d/6846.doc b/changelog.d/6846.doc new file mode 100644 index 0000000000..ad69d608c0 --- /dev/null +++ b/changelog.d/6846.doc @@ -0,0 +1 @@ +Add details of PR merge strategy to contributing docs. \ No newline at end of file diff --git a/changelog.d/6847.misc b/changelog.d/6847.misc new file mode 100644 index 0000000000..094e911adb --- /dev/null +++ b/changelog.d/6847.misc @@ -0,0 +1 @@ +Populate `rooms.room_version` database column at startup, rather than in a background update. diff --git a/changelog.d/6849.bugfix b/changelog.d/6849.bugfix new file mode 100644 index 0000000000..d928a26ec6 --- /dev/null +++ b/changelog.d/6849.bugfix @@ -0,0 +1 @@ +Fix Synapse refusing to start if `federation_certificate_verification_whitelist` option is blank. diff --git a/changelog.d/6854.misc b/changelog.d/6854.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6854.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6855.misc b/changelog.d/6855.misc new file mode 100644 index 0000000000..904361ddfb --- /dev/null +++ b/changelog.d/6855.misc @@ -0,0 +1 @@ +Update pip install directiosn in readme to avoid error when using zsh. diff --git a/changelog.d/6856.misc b/changelog.d/6856.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6856.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6857.misc b/changelog.d/6857.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6857.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6858.misc b/changelog.d/6858.misc new file mode 100644 index 0000000000..08aa80bcd9 --- /dev/null +++ b/changelog.d/6858.misc @@ -0,0 +1 @@ +Refactoring work in preparation for changing the event redaction algorithm. diff --git a/changelog.d/6862.misc b/changelog.d/6862.misc new file mode 100644 index 0000000000..83626d2939 --- /dev/null +++ b/changelog.d/6862.misc @@ -0,0 +1 @@ +Reduce amount we log at `INFO` level. diff --git a/changelog.d/6864.misc b/changelog.d/6864.misc new file mode 100644 index 0000000000..d24eb68460 --- /dev/null +++ b/changelog.d/6864.misc @@ -0,0 +1 @@ +Limit the number of events that can be requested by the backfill federation API to 100. diff --git a/changelog.d/6869.misc b/changelog.d/6869.misc new file mode 100644 index 0000000000..14f88f9bb7 --- /dev/null +++ b/changelog.d/6869.misc @@ -0,0 +1 @@ +Remove unused `get_room_stats_state` method. diff --git a/changelog.d/6871.misc b/changelog.d/6871.misc new file mode 100644 index 0000000000..5161af9983 --- /dev/null +++ b/changelog.d/6871.misc @@ -0,0 +1 @@ +Add typing to `synapse.federation.sender` and port to async/await. diff --git a/changelog.d/6877.removal b/changelog.d/6877.removal new file mode 100644 index 0000000000..9545e31fbe --- /dev/null +++ b/changelog.d/6877.removal @@ -0,0 +1 @@ +Remove `m.lazy_load_members` from `unstable_features` since lazy loading is in the stable Client-Server API version r0.5.0. diff --git a/changelog.d/6882.misc b/changelog.d/6882.misc new file mode 100644 index 0000000000..e8382e36ae --- /dev/null +++ b/changelog.d/6882.misc @@ -0,0 +1 @@ +Reject device display names over 100 characters in length. diff --git a/changelog.d/6883.misc b/changelog.d/6883.misc new file mode 100644 index 0000000000..e0837d7987 --- /dev/null +++ b/changelog.d/6883.misc @@ -0,0 +1 @@ +Add an additional entry to the SyTest blacklist for worker mode. diff --git a/changelog.d/6887.misc b/changelog.d/6887.misc new file mode 100644 index 0000000000..b351d47c7b --- /dev/null +++ b/changelog.d/6887.misc @@ -0,0 +1 @@ +Fix the use of sed in the linting scripts when using BSD sed. diff --git a/changelog.d/6888.feature b/changelog.d/6888.feature new file mode 100644 index 0000000000..1b7ac0c823 --- /dev/null +++ b/changelog.d/6888.feature @@ -0,0 +1 @@ +The result of a user directory search can now be filtered via the spam checker. diff --git a/changelog.d/6891.doc b/changelog.d/6891.doc new file mode 100644 index 0000000000..2f46c385b7 --- /dev/null +++ b/changelog.d/6891.doc @@ -0,0 +1 @@ +Spell out that the last event sent to a room won't be deleted by a purge. diff --git a/changelog.d/6901.misc b/changelog.d/6901.misc new file mode 100644 index 0000000000..b2f12bbe86 --- /dev/null +++ b/changelog.d/6901.misc @@ -0,0 +1 @@ +Return a 404 instead of 200 for querying information of a non-existant user through the admin API. \ No newline at end of file diff --git a/changelog.d/6904.removal b/changelog.d/6904.removal new file mode 100644 index 0000000000..a5cc0c3605 --- /dev/null +++ b/changelog.d/6904.removal @@ -0,0 +1 @@ +Stop sending alias events during adding / removing aliases. Check alt_aliases in the latest canonical aliases event when deleting an alias. diff --git a/changelog.d/6905.doc b/changelog.d/6905.doc new file mode 100644 index 0000000000..be0e698af8 --- /dev/null +++ b/changelog.d/6905.doc @@ -0,0 +1 @@ +Update Synapse's documentation to warn about the deprecation of ACME v1. diff --git a/changelog.d/6906.doc b/changelog.d/6906.doc new file mode 100644 index 0000000000..053b2436ae --- /dev/null +++ b/changelog.d/6906.doc @@ -0,0 +1 @@ +Add documentation for the spam checker. diff --git a/changelog.d/6909.doc b/changelog.d/6909.doc new file mode 100644 index 0000000000..be0e698af8 --- /dev/null +++ b/changelog.d/6909.doc @@ -0,0 +1 @@ +Update Synapse's documentation to warn about the deprecation of ACME v1. diff --git a/changelog.d/6915.misc b/changelog.d/6915.misc new file mode 100644 index 0000000000..3a181ef243 --- /dev/null +++ b/changelog.d/6915.misc @@ -0,0 +1 @@ +Add type hints to the spam checker module. diff --git a/changelog.d/6918.docker b/changelog.d/6918.docker new file mode 100644 index 0000000000..cc2db5e071 --- /dev/null +++ b/changelog.d/6918.docker @@ -0,0 +1 @@ +The deprecated "generate-config-on-the-fly" mode is no longer supported. diff --git a/changelog.d/6919.misc b/changelog.d/6919.misc new file mode 100644 index 0000000000..aa2cd89998 --- /dev/null +++ b/changelog.d/6919.misc @@ -0,0 +1 @@ +Convert the directory handler tests to use HomeserverTestCase. diff --git a/changelog.d/6920.misc b/changelog.d/6920.misc new file mode 100644 index 0000000000..d333add990 --- /dev/null +++ b/changelog.d/6920.misc @@ -0,0 +1 @@ +Add a warning about indentation to generated configuration files. diff --git a/changelog.d/6921.docker b/changelog.d/6921.docker new file mode 100644 index 0000000000..152e723339 --- /dev/null +++ b/changelog.d/6921.docker @@ -0,0 +1 @@ +Databases created using the compose file in contrib/docker will now always have correct encoding and locale settings. Contributed by Fridtjof Mund. diff --git a/changelog.d/6937.misc b/changelog.d/6937.misc new file mode 100644 index 0000000000..6d00e58654 --- /dev/null +++ b/changelog.d/6937.misc @@ -0,0 +1 @@ +Increase perf of `get_auth_chain_ids` used in state res v2. diff --git a/changelog.d/6938.doc b/changelog.d/6938.doc new file mode 100644 index 0000000000..117f76f48a --- /dev/null +++ b/changelog.d/6938.doc @@ -0,0 +1 @@ +Fix worker docs to point `/publicised_groups` API correctly. diff --git a/contrib/docker/docker-compose.yml b/contrib/docker/docker-compose.yml index 2b044baf78..5df29379c8 100644 --- a/contrib/docker/docker-compose.yml +++ b/contrib/docker/docker-compose.yml @@ -56,6 +56,9 @@ services: environment: - POSTGRES_USER=synapse - POSTGRES_PASSWORD=changeme + # ensure the database gets created correctly + # https://github.com/matrix-org/synapse/blob/master/docs/postgres.md#set-up-database + - POSTGRES_INITDB_ARGS="--encoding=UTF-8 --lc-collate=C --lc-ctype=C" volumes: # You may store the database tables in a local folder.. - ./schemas:/var/lib/postgresql/data diff --git a/debian/changelog b/debian/changelog index 74eb29c5ee..90314d36af 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,3 +1,15 @@ +matrix-synapse-py3 (1.10.1) stable; urgency=medium + + * New synapse release 1.10.1. + + -- Synapse Packaging team <packages@matrix.org> Mon, 17 Feb 2020 16:27:28 +0000 + +matrix-synapse-py3 (1.10.0) stable; urgency=medium + + * New synapse release 1.10.0. + + -- Synapse Packaging team <packages@matrix.org> Wed, 12 Feb 2020 12:18:54 +0000 + matrix-synapse-py3 (1.9.1) stable; urgency=medium * New synapse release 1.9.1. diff --git a/docker/Dockerfile b/docker/Dockerfile index e5a0d6d5f6..93d61739ae 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -16,7 +16,7 @@ ARG PYTHON_VERSION=3.7 ### ### Stage 0: builder ### -FROM docker.io/python:${PYTHON_VERSION}-alpine3.10 as builder +FROM docker.io/python:${PYTHON_VERSION}-alpine3.11 as builder # install the OS build deps diff --git a/docker/README.md b/docker/README.md index 9f112a01d0..8c337149ca 100644 --- a/docker/README.md +++ b/docker/README.md @@ -110,12 +110,12 @@ argument to `docker run`. ## Legacy dynamic configuration file support -For backwards-compatibility only, the docker image supports creating a dynamic -configuration file based on environment variables. This is now deprecated, but -is enabled when the `SYNAPSE_SERVER_NAME` variable is set (and `generate` is -not given). +The docker image used to support creating a dynamic configuration file based +on environment variables. This is no longer supported, and an error will be +raised if you try to run synapse without a config file. -To migrate from a dynamic configuration file to a static one, run the docker +It is, however, possible to generate a static configuration file based on +the environment variables that were previously used. To do this, run the docker container once with the environment variables set, and `migrate_config` command line option. For example: @@ -127,15 +127,20 @@ docker run -it --rm \ matrixdotorg/synapse:latest migrate_config ``` -This will generate the same configuration file as the legacy mode used, but -will store it in `/data/homeserver.yaml` instead of a temporary location. You -can then use it as shown above at [Running synapse](#running-synapse). +This will generate the same configuration file as the legacy mode used, and +will store it in `/data/homeserver.yaml`. You can then use it as shown above at +[Running synapse](#running-synapse). + +Note that the defaults used in this configuration file may be different to +those when generating a new config file with `generate`: for example, TLS is +enabled by default in this mode. You are encouraged to inspect the generated +configuration file and edit it to ensure it meets your needs. ## Building the image If you need to build the image from a Synapse checkout, use the following `docker build` command from the repo's root: - + ``` docker build -t matrixdotorg/synapse -f docker/Dockerfile . ``` diff --git a/docker/start.py b/docker/start.py index 97fd247f8f..2a25c9380e 100755 --- a/docker/start.py +++ b/docker/start.py @@ -188,11 +188,6 @@ def main(args, environ): else: ownership = "{}:{}".format(desired_uid, desired_gid) - log( - "Container running as UserID %s:%s, ENV (or defaults) requests %s:%s" - % (os.getuid(), os.getgid(), desired_uid, desired_gid) - ) - if ownership is None: log("Will not perform chmod/su-exec as UserID already matches request") @@ -213,38 +208,30 @@ def main(args, environ): if mode is not None: error("Unknown execution mode '%s'" % (mode,)) - if "SYNAPSE_SERVER_NAME" in environ: - # backwards-compatibility generate-a-config-on-the-fly mode - if "SYNAPSE_CONFIG_PATH" in environ: + config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data") + config_path = environ.get("SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml") + + if not os.path.exists(config_path): + if "SYNAPSE_SERVER_NAME" in environ: error( - "SYNAPSE_SERVER_NAME can only be combined with SYNAPSE_CONFIG_PATH " - "in `generate` or `migrate_config` mode. To start synapse using a " - "config file, unset the SYNAPSE_SERVER_NAME environment variable." + """\ +Config file '%s' does not exist. + +The synapse docker image no longer supports generating a config file on-the-fly +based on environment variables. You can migrate to a static config file by +running with 'migrate_config'. See the README for more details. +""" + % (config_path,) ) - config_path = "/compiled/homeserver.yaml" - log( - "Generating config file '%s' on-the-fly from environment variables.\n" - "Note that this mode is deprecated. You can migrate to a static config\n" - "file by running with 'migrate_config'. See the README for more details." + error( + "Config file '%s' does not exist. You should either create a new " + "config file by running with the `generate` argument (and then edit " + "the resulting file before restarting) or specify the path to an " + "existing config file with the SYNAPSE_CONFIG_PATH variable." % (config_path,) ) - generate_config_from_template("/compiled", config_path, environ, ownership) - else: - config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data") - config_path = environ.get( - "SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml" - ) - if not os.path.exists(config_path): - error( - "Config file '%s' does not exist. You should either create a new " - "config file by running with the `generate` argument (and then edit " - "the resulting file before restarting) or specify the path to an " - "existing config file with the SYNAPSE_CONFIG_PATH variable." - % (config_path,) - ) - log("Starting synapse with config file " + config_path) args = ["python", "-m", synapse_worker, "--config-path", config_path] diff --git a/docs/.sample_config_header.yaml b/docs/.sample_config_header.yaml index e001ef5983..35a591d042 100644 --- a/docs/.sample_config_header.yaml +++ b/docs/.sample_config_header.yaml @@ -1,4 +1,4 @@ -# The config is maintained as an up-to-date snapshot of the default +# This file is maintained as an up-to-date snapshot of the default # homeserver.yaml configuration generated by Synapse. # # It is intended to act as a reference for the default configuration, @@ -10,3 +10,5 @@ # homeserver.yaml. Instead, if you are starting from scratch, please generate # a fresh config using Synapse by following the instructions in INSTALL.md. +################################################################################ + diff --git a/docs/ACME.md b/docs/ACME.md index 9eb18a9cf5..f4c4740476 100644 --- a/docs/ACME.md +++ b/docs/ACME.md @@ -1,12 +1,48 @@ # ACME -Synapse v1.0 will require valid TLS certificates for communication between -servers (port `8448` by default) in addition to those that are client-facing -(port `443`). If you do not already have a valid certificate for your domain, -the easiest way to get one is with Synapse's new ACME support, which will use -the ACME protocol to provision a certificate automatically. Synapse v0.99.0+ -will provision server-to-server certificates automatically for you for free -through [Let's Encrypt](https://letsencrypt.org/) if you tell it to. +From version 1.0 (June 2019) onwards, Synapse requires valid TLS +certificates for communication between servers (by default on port +`8448`) in addition to those that are client-facing (port `443`). To +help homeserver admins fulfil this new requirement, Synapse v0.99.0 +introduced support for automatically provisioning certificates through +[Let's Encrypt](https://letsencrypt.org/) using the ACME protocol. + +## Deprecation of ACME v1 + +In [March 2019](https://community.letsencrypt.org/t/end-of-life-plan-for-acmev1/88430), +Let's Encrypt announced that they were deprecating version 1 of the ACME +protocol, with the plan to disable the use of it for new accounts in +November 2019, and for existing accounts in June 2020. + +Synapse doesn't currently support version 2 of the ACME protocol, which +means that: + +* for existing installs, Synapse's built-in ACME support will continue + to work until June 2020. +* for new installs, this feature will not work at all. + +Either way, it is recommended to move from Synapse's ACME support +feature to an external automated tool such as [certbot](https://github.com/certbot/certbot) +(or browse [this list](https://letsencrypt.org/fr/docs/client-options/) +for an alternative ACME client). + +It's also recommended to use a reverse proxy for the server-facing +communications (more documentation about this can be found +[here](/docs/reverse_proxy.md)) as well as the client-facing ones and +have it serve the certificates. + +In case you can't do that and need Synapse to serve them itself, make +sure to set the `tls_certificate_path` configuration setting to the path +of the certificate (make sure to use the certificate containing the full +certification chain, e.g. `fullchain.pem` if using certbot) and +`tls_private_key_path` to the path of the matching private key. Note +that in this case you will need to restart Synapse after each +certificate renewal so that Synapse stops using the old certificate. + +If you still want to use Synapse's built-in ACME support, the rest of +this document explains how to set it up. + +## Initial setup In the case that your `server_name` config variable is the same as the hostname that the client connects to, then the same certificate can be @@ -32,11 +68,6 @@ If you already have certificates, you will need to back up or delete them (files `example.com.tls.crt` and `example.com.tls.key` in Synapse's root directory), Synapse's ACME implementation will not overwrite them. -You may wish to use alternate methods such as Certbot to obtain a certificate -from Let's Encrypt, depending on your server configuration. Of course, if you -already have a valid certificate for your homeserver's domain, that can be -placed in Synapse's config directory without the need for any ACME setup. - ## ACME setup The main steps for enabling ACME support in short summary are: diff --git a/docs/admin_api/purge_history_api.rst b/docs/admin_api/purge_history_api.rst index f7be226fd9..e2a620c54f 100644 --- a/docs/admin_api/purge_history_api.rst +++ b/docs/admin_api/purge_history_api.rst @@ -8,6 +8,9 @@ Depending on the amount of history being purged a call to the API may take several minutes or longer. During this period users will not be able to paginate further back in the room from the point being purged from. +Note that Synapse requires at least one message in each room, so it will never +delete the last message in a room. + The API is: ``POST /_synapse/admin/v1/purge_history/<room_id>[/<event_id>]`` diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst index 0b3d09d694..6b02d963e6 100644 --- a/docs/admin_api/user_admin_api.rst +++ b/docs/admin_api/user_admin_api.rst @@ -2,7 +2,8 @@ Create or modify Account ======================== This API allows an administrator to create or modify a user account with a -specific ``user_id``. +specific ``user_id``. Be aware that ``user_id`` is fully qualified: for example, +``@user:server.com``. This api is:: @@ -15,6 +16,16 @@ with a body of: { "password": "user_password", "displayname": "User", + "threepids": [ + { + "medium": "email", + "address": "<user_mail_1>" + }, + { + "medium": "email", + "address": "<user_mail_2>" + } + ], "avatar_url": "<avatar_url>", "admin": false, "deactivated": false @@ -23,6 +34,7 @@ with a body of: including an ``access_token`` of a server admin. The parameter ``displayname`` is optional and defaults to ``user_id``. +The parameter ``threepids`` is optional. The parameter ``avatar_url`` is optional. The parameter ``admin`` is optional and defaults to 'false'. The parameter ``deactivated`` is optional and defaults to 'false'. diff --git a/docs/message_retention_policies.md b/docs/message_retention_policies.md index 4300809dfe..1dd60bdad9 100644 --- a/docs/message_retention_policies.md +++ b/docs/message_retention_policies.md @@ -42,6 +42,10 @@ purged according to its room's policy, then the receiving server will process and store that event until it's picked up by the next purge job, though it will always hide it from clients. +Synapse requires at least one message in each room, so it will never +delete the last message in a room. It will, however, hide it from +clients. + ## Server configuration diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 8e8cf513b0..93236daddc 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1,4 +1,4 @@ -# The config is maintained as an up-to-date snapshot of the default +# This file is maintained as an up-to-date snapshot of the default # homeserver.yaml configuration generated by Synapse. # # It is intended to act as a reference for the default configuration, @@ -10,6 +10,16 @@ # homeserver.yaml. Instead, if you are starting from scratch, please generate # a fresh config using Synapse by following the instructions in INSTALL.md. +################################################################################ + +# Configuration file for Synapse. +# +# This is a YAML file: see [1] for a quick introduction. Note in particular +# that *indentation is important*: all the elements of a list or dictionary +# should have the same indentation. +# +# [1] https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html + ## Server ## # The domain name of the server, with optional explicit port. diff --git a/docs/spam_checker.md b/docs/spam_checker.md new file mode 100644 index 0000000000..5b5f5000b7 --- /dev/null +++ b/docs/spam_checker.md @@ -0,0 +1,88 @@ +# Handling spam in Synapse + +Synapse has support to customize spam checking behavior. It can plug into a +variety of events and affect how they are presented to users on your homeserver. + +The spam checking behavior is implemented as a Python class, which must be +able to be imported by the running Synapse. + +## Python spam checker class + +The Python class is instantiated with two objects: + +* Any configuration (see below). +* An instance of `synapse.spam_checker_api.SpamCheckerApi`. + +It then implements methods which return a boolean to alter behavior in Synapse. + +There's a generic method for checking every event (`check_event_for_spam`), as +well as some specific methods: + +* `user_may_invite` +* `user_may_create_room` +* `user_may_create_room_alias` +* `user_may_publish_room` + +The details of the each of these methods (as well as their inputs and outputs) +are documented in the `synapse.events.spamcheck.SpamChecker` class. + +The `SpamCheckerApi` class provides a way for the custom spam checker class to +call back into the homeserver internals. It currently implements the following +methods: + +* `get_state_events_in_room` + +### Example + +```python +class ExampleSpamChecker: + def __init__(self, config, api): + self.config = config + self.api = api + + def check_event_for_spam(self, foo): + return False # allow all events + + def user_may_invite(self, inviter_userid, invitee_userid, room_id): + return True # allow all invites + + def user_may_create_room(self, userid): + return True # allow all room creations + + def user_may_create_room_alias(self, userid, room_alias): + return True # allow all room aliases + + def user_may_publish_room(self, userid, room_id): + return True # allow publishing of all rooms + + def check_username_for_spam(self, user_profile): + return False # allow all usernames +``` + +## Configuration + +Modify the `spam_checker` section of your `homeserver.yaml` in the following +manner: + +`module` should point to the fully qualified Python class that implements your +custom logic, e.g. `my_module.ExampleSpamChecker`. + +`config` is a dictionary that gets passed to the spam checker class. + +### Example + +This section might look like: + +```yaml +spam_checker: + module: my_module.ExampleSpamChecker + config: + # Enable or disable a specific option in ExampleSpamChecker. + my_custom_option: true +``` + +## Examples + +The [Mjolnir](https://github.com/matrix-org/mjolnir) project is a full fledged +example using the Synapse spam checking API, including a bot for dynamic +configuration. diff --git a/docs/workers.md b/docs/workers.md index 6f7ec58780..0d84a58958 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -261,7 +261,8 @@ following regular expressions: ^/_matrix/client/versions$ ^/_matrix/client/(api/v1|r0|unstable)/voip/turnServer$ ^/_matrix/client/(api/v1|r0|unstable)/joined_groups$ - ^/_matrix/client/(api/v1|r0|unstable)/get_groups_publicised$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups$ + ^/_matrix/client/(api/v1|r0|unstable)/publicised_groups/ Additionally, the following REST endpoints can be handled for GET requests: @@ -287,8 +288,8 @@ the following regular expressions: ^/_matrix/client/(api/v1|r0|unstable)/user_directory/search$ -When using this worker you must also set `update_user_directory: False` in the -shared configuration file to stop the main synapse running background +When using this worker you must also set `update_user_directory: False` in the +shared configuration file to stop the main synapse running background jobs related to updating the user directory. ### `synapse.app.frontend_proxy` diff --git a/scripts-dev/config-lint.sh b/scripts-dev/config-lint.sh index 677a854c85..189ca66535 100755 --- a/scripts-dev/config-lint.sh +++ b/scripts-dev/config-lint.sh @@ -3,7 +3,8 @@ # Exits with 0 if there are no problems, or another code otherwise. # Fix non-lowercase true/false values -sed -i -E "s/: +True/: true/g; s/: +False/: false/g;" docs/sample_config.yaml +sed -i.bak -E "s/: +True/: true/g; s/: +False/: false/g;" docs/sample_config.yaml +rm docs/sample_config.yaml.bak # Check if anything changed git diff --exit-code docs/sample_config.yaml diff --git a/synapse/__init__.py b/synapse/__init__.py index cc69ff0c41..8313f177d2 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -36,7 +36,7 @@ try: except ImportError: pass -__version__ = "1.10.0rc4" +__version__ = "1.10.1" if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)): # We import here so that we don't have to install a bunch of deps when diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 08619404bb..ba846042c4 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -53,6 +53,18 @@ Missing mandatory `server_name` config option. """ +CONFIG_FILE_HEADER = """\ +# Configuration file for Synapse. +# +# This is a YAML file: see [1] for a quick introduction. Note in particular +# that *indentation is important*: all the elements of a list or dictionary +# should have the same indentation. +# +# [1] https://docs.ansible.com/ansible/latest/reference_appendices/YAMLSyntax.html + +""" + + def path_exists(file_path): """Check if a file exists @@ -344,7 +356,7 @@ class RootConfig(object): str: the yaml config file """ - return "\n\n".join( + return CONFIG_FILE_HEADER + "\n\n".join( dedent(conf) for conf in self.invoke_all( "generate_config_section", @@ -574,8 +586,8 @@ class RootConfig(object): if not path_exists(config_dir_path): os.makedirs(config_dir_path) with open(config_path, "w") as config_file: - config_file.write("# vim:ft=yaml\n\n") config_file.write(config_str) + config_file.write("\n\n# vim:ft=yaml") config_dict = yaml.safe_load(config_str) obj.generate_missing_files(config_dict, config_dir_path) diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 2e9e478a2a..2514b0713d 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -109,6 +109,8 @@ class TlsConfig(Config): fed_whitelist_entries = config.get( "federation_certificate_verification_whitelist", [] ) + if fed_whitelist_entries is None: + fed_whitelist_entries = [] # Support globs (*) in whitelist values self.federation_certificate_verification_whitelist = [] # type: List[str] diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index f813fa2fe7..a842661a90 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -16,13 +16,13 @@ import os from distutils.util import strtobool +from typing import Optional, Type import six from unpaddedbase64 import encode_base64 -from synapse.api.errors import UnsupportedRoomVersionError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions from synapse.types import JsonDict from synapse.util.caches import intern_dict from synapse.util.frozenutils import freeze @@ -189,9 +189,15 @@ class EventBase(object): redacts = _event_dict_property("redacts", None) room_id = _event_dict_property("room_id") sender = _event_dict_property("sender") + state_key = _event_dict_property("state_key") + type = _event_dict_property("type") user_id = _event_dict_property("sender") @property + def event_id(self) -> str: + raise NotImplementedError() + + @property def membership(self): return self.content["membership"] @@ -281,10 +287,7 @@ class FrozenEvent(EventBase): else: frozen_dict = event_dict - self.event_id = event_dict["event_id"] - self.type = event_dict["type"] - if "state_key" in event_dict: - self.state_key = event_dict["state_key"] + self._event_id = event_dict["event_id"] super(FrozenEvent, self).__init__( frozen_dict, @@ -294,6 +297,10 @@ class FrozenEvent(EventBase): rejected_reason=rejected_reason, ) + @property + def event_id(self) -> str: + return self._event_id + def __str__(self): return self.__repr__() @@ -332,9 +339,6 @@ class FrozenEventV2(EventBase): frozen_dict = event_dict self._event_id = None - self.type = event_dict["type"] - if "state_key" in event_dict: - self.state_key = event_dict["state_key"] super(FrozenEventV2, self).__init__( frozen_dict, @@ -404,28 +408,7 @@ class FrozenEventV3(FrozenEventV2): return self._event_id -def room_version_to_event_format(room_version): - """Converts a room version string to the event format - - Args: - room_version (str) - - Returns: - int - - Raises: - UnsupportedRoomVersionError if the room version is unknown - """ - v = KNOWN_ROOM_VERSIONS.get(room_version) - - if not v: - # this can happen if support is withdrawn for a room version - raise UnsupportedRoomVersionError() - - return v.event_format - - -def event_type_from_format_version(format_version): +def event_type_from_format_version(format_version: int) -> Type[EventBase]: """Returns the python type to use to construct an Event object for the given event format version. @@ -445,3 +428,14 @@ def event_type_from_format_version(format_version): return FrozenEventV3 else: raise Exception("No event format %r" % (format_version,)) + + +def make_event_from_dict( + event_dict: JsonDict, + room_version: RoomVersion = RoomVersions.V1, + internal_metadata_dict: JsonDict = {}, + rejected_reason: Optional[str] = None, +) -> EventBase: + """Construct an EventBase from the given event dict""" + event_type = event_type_from_format_version(room_version.event_format) + return event_type(event_dict, internal_metadata_dict, rejected_reason) diff --git a/synapse/events/builder.py b/synapse/events/builder.py index 8d63ad6dc3..a0c4a40c27 100644 --- a/synapse/events/builder.py +++ b/synapse/events/builder.py @@ -28,11 +28,7 @@ from synapse.api.room_versions import ( RoomVersion, ) from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import ( - EventBase, - _EventInternalMetadata, - event_type_from_format_version, -) +from synapse.events import EventBase, _EventInternalMetadata, make_event_from_dict from synapse.types import EventID, JsonDict from synapse.util import Clock from synapse.util.stringutils import random_string @@ -256,8 +252,8 @@ def create_local_event_from_event_dict( event_dict.setdefault("signatures", {}) add_hashes_and_signatures(room_version, event_dict, hostname, signing_key) - return event_type_from_format_version(format_version)( - event_dict, internal_metadata_dict=internal_metadata_dict + return make_event_from_dict( + event_dict, room_version, internal_metadata_dict=internal_metadata_dict ) diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index 5a907718d6..a23b6b7b61 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -15,12 +15,17 @@ # limitations under the License. import inspect +from typing import Dict from synapse.spam_checker_api import SpamCheckerApi +MYPY = False +if MYPY: + import synapse.server + class SpamChecker(object): - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self.spam_checker = None module = None @@ -40,7 +45,7 @@ class SpamChecker(object): else: self.spam_checker = module(config=config) - def check_event_for_spam(self, event): + def check_event_for_spam(self, event: "synapse.events.EventBase") -> bool: """Checks if a given event is considered "spammy" by this server. If the server considers an event spammy, then it will be rejected if @@ -48,26 +53,30 @@ class SpamChecker(object): users receive a blank event. Args: - event (synapse.events.EventBase): the event to be checked + event: the event to be checked Returns: - bool: True if the event is spammy. + True if the event is spammy. """ if self.spam_checker is None: return False return self.spam_checker.check_event_for_spam(event) - def user_may_invite(self, inviter_userid, invitee_userid, room_id): + def user_may_invite( + self, inviter_userid: str, invitee_userid: str, room_id: str + ) -> bool: """Checks if a given user may send an invite If this method returns false, the invite will be rejected. Args: - userid (string): The sender's user ID + inviter_userid: The user ID of the sender of the invitation + invitee_userid: The user ID targeted in the invitation + room_id: The room ID Returns: - bool: True if the user may send an invite, otherwise False + True if the user may send an invite, otherwise False """ if self.spam_checker is None: return True @@ -76,52 +85,78 @@ class SpamChecker(object): inviter_userid, invitee_userid, room_id ) - def user_may_create_room(self, userid): + def user_may_create_room(self, userid: str) -> bool: """Checks if a given user may create a room If this method returns false, the creation request will be rejected. Args: - userid (string): The sender's user ID + userid: The ID of the user attempting to create a room Returns: - bool: True if the user may create a room, otherwise False + True if the user may create a room, otherwise False """ if self.spam_checker is None: return True return self.spam_checker.user_may_create_room(userid) - def user_may_create_room_alias(self, userid, room_alias): + def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool: """Checks if a given user may create a room alias If this method returns false, the association request will be rejected. Args: - userid (string): The sender's user ID - room_alias (string): The alias to be created + userid: The ID of the user attempting to create a room alias + room_alias: The alias to be created Returns: - bool: True if the user may create a room alias, otherwise False + True if the user may create a room alias, otherwise False """ if self.spam_checker is None: return True return self.spam_checker.user_may_create_room_alias(userid, room_alias) - def user_may_publish_room(self, userid, room_id): + def user_may_publish_room(self, userid: str, room_id: str) -> bool: """Checks if a given user may publish a room to the directory If this method returns false, the publish request will be rejected. Args: - userid (string): The sender's user ID - room_id (string): The ID of the room that would be published + userid: The user ID attempting to publish the room + room_id: The ID of the room that would be published Returns: - bool: True if the user may publish the room, otherwise False + True if the user may publish the room, otherwise False """ if self.spam_checker is None: return True return self.spam_checker.user_may_publish_room(userid, room_id) + + def check_username_for_spam(self, user_profile: Dict[str, str]) -> bool: + """Checks if a user ID or display name are considered "spammy" by this server. + + If the server considers a username spammy, then it will not be included in + user directory results. + + Args: + user_profile: The user information to check, it contains the keys: + * user_id + * display_name + * avatar_url + + Returns: + True if the user is spammy. + """ + if self.spam_checker is None: + return False + + # For backwards compatibility, if the method does not exist on the spam checker, fallback to not interfering. + checker = getattr(self.spam_checker, "check_username_for_spam", None) + if not checker: + return False + # Make a copy of the user profile object to ensure the spam checker + # cannot modify it. + return checker(user_profile.copy()) diff --git a/synapse/events/third_party_rules.py b/synapse/events/third_party_rules.py index 86f7e5f8aa..459132d388 100644 --- a/synapse/events/third_party_rules.py +++ b/synapse/events/third_party_rules.py @@ -74,15 +74,16 @@ class ThirdPartyEventRules(object): is_requester_admin (bool): If the requester is an admin Returns: - defer.Deferred + defer.Deferred[bool]: Whether room creation is allowed or denied. """ if self.third_party_rules is None: - return + return True - yield self.third_party_rules.on_create_room( + ret = yield self.third_party_rules.on_create_room( requester, config, is_requester_admin ) + return ret @defer.inlineCallbacks def check_threepid_can_be_invited(self, medium, address, room_id): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 0e22183280..eea64c1c9f 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -22,9 +23,13 @@ from twisted.internet.defer import DeferredList from synapse.api.constants import MAX_DEPTH, EventTypes, Membership from synapse.api.errors import Codes, SynapseError -from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions +from synapse.api.room_versions import ( + KNOWN_ROOM_VERSIONS, + EventFormatVersions, + RoomVersion, +) from synapse.crypto.event_signing import check_event_content_hash -from synapse.events import event_type_from_format_version +from synapse.events import EventBase, make_event_from_dict from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -33,7 +38,7 @@ from synapse.logging.context import ( make_deferred_yieldable, preserve_fn, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -342,16 +347,15 @@ def _is_invite_via_3pid(event): ) -def event_from_pdu_json(pdu_json, event_format_version, outlier=False): - """Construct a FrozenEvent from an event json received over federation +def event_from_pdu_json( + pdu_json: JsonDict, room_version: RoomVersion, outlier: bool = False +) -> EventBase: + """Construct an EventBase from an event json received over federation Args: - pdu_json (object): pdu as received over federation - event_format_version (int): The event format version - outlier (bool): True to mark this event as an outlier - - Returns: - FrozenEvent + pdu_json: pdu as received over federation + room_version: The version of the room this event belongs to + outlier: True to mark this event as an outlier Raises: SynapseError: if the pdu is missing required fields or is otherwise @@ -370,8 +374,7 @@ def event_from_pdu_json(pdu_json, event_format_version, outlier=False): elif depth > MAX_DEPTH: raise SynapseError(400, "Depth too large", Codes.BAD_JSON) - event = event_type_from_format_version(event_format_version)(pdu_json) - + event = make_event_from_dict(pdu_json, room_version) event.internal_metadata.outlier = outlier return event diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index f99d17a7de..4870e39652 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -17,7 +17,18 @@ import copy import itertools import logging -from typing import Dict, Iterable +from typing import ( + Any, + Awaitable, + Callable, + Dict, + Iterable, + List, + Optional, + Sequence, + Tuple, + TypeVar, +) from prometheus_client import Counter @@ -35,12 +46,14 @@ from synapse.api.errors import ( from synapse.api.room_versions import ( KNOWN_ROOM_VERSIONS, EventFormatVersions, + RoomVersion, RoomVersions, ) -from synapse.events import builder, room_version_to_event_format +from synapse.events import EventBase, builder from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.logging.context import make_deferred_yieldable from synapse.logging.utils import log_function +from synapse.types import JsonDict from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.retryutils import NotRetryingDestination @@ -52,6 +65,8 @@ sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["t PDU_RETRY_TIME_MS = 1 * 60 * 1000 +T = TypeVar("T") + class InvalidResponseError(RuntimeError): """Helper for _try_destination_list: indicates that the server returned a response @@ -170,21 +185,17 @@ class FederationClient(FederationBase): sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys(destination, content, timeout) - @defer.inlineCallbacks - @log_function - def backfill(self, dest, room_id, limit, extremities): - """Requests some more historic PDUs for the given context from the + async def backfill( + self, dest: str, room_id: str, limit: int, extremities: Iterable[str] + ) -> List[EventBase]: + """Requests some more historic PDUs for the given room from the given destination server. Args: dest (str): The remote homeserver to ask. room_id (str): The room_id to backfill. - limit (int): The maximum number of PDUs to return. - extremities (list): List of PDU id and origins of the first pdus - we have seen from the context - - Returns: - Deferred: Results in the received PDUs. + limit (int): The maximum number of events to return. + extremities (list): our current backwards extremities, to backfill from """ logger.debug("backfill extrem=%s", extremities) @@ -192,34 +203,37 @@ class FederationClient(FederationBase): if not extremities: return - transaction_data = yield self.transport_layer.backfill( + transaction_data = await self.transport_layer.backfill( dest, room_id, extremities, limit ) logger.debug("backfill transaction_data=%r", transaction_data) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) pdus = [ - event_from_pdu_json(p, format_ver, outlier=False) + event_from_pdu_json(p, room_version, outlier=False) for p in transaction_data["pdus"] ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield make_deferred_yieldable( + pdus[:] = await make_deferred_yieldable( defer.gatherResults( - self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True + self._check_sigs_and_hashes(room_version.identifier, pdus), + consumeErrors=True, ).addErrback(unwrapFirstError) ) return pdus - @defer.inlineCallbacks - @log_function - def get_pdu( - self, destinations, event_id, room_version, outlier=False, timeout=None - ): + async def get_pdu( + self, + destinations: Iterable[str], + event_id: str, + room_version: RoomVersion, + outlier: bool = False, + timeout: Optional[int] = None, + ) -> Optional[EventBase]: """Requests the PDU with given origin and ID from the remote home servers. @@ -227,18 +241,17 @@ class FederationClient(FederationBase): one succeeds. Args: - destinations (list): Which homeservers to query - event_id (str): event to fetch - room_version (str): version of the room - outlier (bool): Indicates whether the PDU is an `outlier`, i.e. if + destinations: Which homeservers to query + event_id: event to fetch + room_version: version of the room + outlier: Indicates whether the PDU is an `outlier`, i.e. if it's from an arbitary point in the context as opposed to part of the current block of PDUs. Defaults to `False` - timeout (int): How long to try (in ms) each destination for before + timeout: How long to try (in ms) each destination for before moving to the next destination. None indicates no timeout. Returns: - Deferred: Results in the requested PDU, or None if we were unable to find - it. + The requested PDU, or None if we were unable to find it. """ # TODO: Rate limit the number of times we try and get the same event. @@ -249,8 +262,6 @@ class FederationClient(FederationBase): pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) - format_ver = room_version_to_event_format(room_version) - signed_pdu = None for destination in destinations: now = self._clock.time_msec() @@ -259,7 +270,7 @@ class FederationClient(FederationBase): continue try: - transaction_data = yield self.transport_layer.get_event( + transaction_data = await self.transport_layer.get_event( destination, event_id, timeout=timeout ) @@ -271,7 +282,7 @@ class FederationClient(FederationBase): ) pdu_list = [ - event_from_pdu_json(p, format_ver, outlier=outlier) + event_from_pdu_json(p, room_version, outlier=outlier) for p in transaction_data["pdus"] ] @@ -279,7 +290,9 @@ class FederationClient(FederationBase): pdu = pdu_list[0] # Check signatures are correct. - signed_pdu = yield self._check_sigs_and_hash(room_version, pdu) + signed_pdu = await self._check_sigs_and_hash( + room_version.identifier, pdu + ) break @@ -309,15 +322,16 @@ class FederationClient(FederationBase): return signed_pdu - @defer.inlineCallbacks - def get_room_state_ids(self, destination: str, room_id: str, event_id: str): + async def get_room_state_ids( + self, destination: str, room_id: str, event_id: str + ) -> Tuple[List[str], List[str]]: """Calls the /state_ids endpoint to fetch the state at a particular point in the room, and the auth events for the given event Returns: - Tuple[List[str], List[str]]: a tuple of (state event_ids, auth event_ids) + a tuple of (state event_ids, auth event_ids) """ - result = yield self.transport_layer.get_room_state_ids( + result = await self.transport_layer.get_room_state_ids( destination, room_id, event_id=event_id ) @@ -331,37 +345,39 @@ class FederationClient(FederationBase): return state_event_ids, auth_event_ids - @defer.inlineCallbacks - @log_function - def get_event_auth(self, destination, room_id, event_id): - res = yield self.transport_layer.get_event_auth(destination, room_id, event_id) + async def get_event_auth(self, destination, room_id, event_id): + res = await self.transport_layer.get_event_auth(destination, room_id, event_id) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(p, format_ver, outlier=True) for p in res["auth_chain"] + event_from_pdu_json(p, room_version, outlier=True) + for p in res["auth_chain"] ] - signed_auth = yield self._check_sigs_and_hash_and_fetch( - destination, auth_chain, outlier=True, room_version=room_version + signed_auth = await self._check_sigs_and_hash_and_fetch( + destination, auth_chain, outlier=True, room_version=room_version.identifier ) signed_auth.sort(key=lambda e: e.depth) return signed_auth - @defer.inlineCallbacks - def _try_destination_list(self, description, destinations, callback): + async def _try_destination_list( + self, + description: str, + destinations: Iterable[str], + callback: Callable[[str], Awaitable[T]], + ) -> T: """Try an operation on a series of servers, until it succeeds Args: - description (unicode): description of the operation we're doing, for logging + description: description of the operation we're doing, for logging - destinations (Iterable[unicode]): list of server_names to try + destinations: list of server_names to try - callback (callable): Function to run for each server. Passed a single - argument: the server_name to try. May return a deferred. + callback: Function to run for each server. Passed a single + argument: the server_name to try. If the callback raises a CodeMessageException with a 300/400 code, attempts to perform the operation stop immediately and the exception is @@ -372,7 +388,7 @@ class FederationClient(FederationBase): suppressed if the exception is an InvalidResponseError. Returns: - The [Deferred] result of callback, if it succeeds + The result of callback, if it succeeds Raises: SynapseError if the chosen remote server returns a 300/400 code, or @@ -383,7 +399,7 @@ class FederationClient(FederationBase): continue try: - res = yield callback(destination) + res = await callback(destination) return res except InvalidResponseError as e: logger.warning("Failed to %s via %s: %s", description, destination, e) @@ -402,12 +418,12 @@ class FederationClient(FederationBase): ) except Exception: logger.warning( - "Failed to %s via %s", description, destination, exc_info=1 + "Failed to %s via %s", description, destination, exc_info=True ) raise SynapseError(502, "Failed to %s via any server" % (description,)) - def make_membership_event( + async def make_membership_event( self, destinations: Iterable[str], room_id: str, @@ -415,7 +431,7 @@ class FederationClient(FederationBase): membership: str, content: dict, params: Dict[str, str], - ): + ) -> Tuple[str, EventBase, RoomVersion]: """ Creates an m.room.member event, with context, without participating in the room. @@ -436,19 +452,19 @@ class FederationClient(FederationBase): content: Any additional data to put into the content field of the event. params: Query parameters to include in the request. - Return: - Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of + + Returns: `(origin, event, room_version)` where origin is the remote homeserver which generated the event, and room_version is the version of the room. - Fails with a `UnsupportedRoomVersionError` if remote responds with - a room version we don't understand. + Raises: + UnsupportedRoomVersionError: if remote responds with + a room version we don't understand. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ valid_memberships = {Membership.JOIN, Membership.LEAVE} if membership not in valid_memberships: @@ -457,9 +473,8 @@ class FederationClient(FederationBase): % (membership, ",".join(valid_memberships)) ) - @defer.inlineCallbacks - def send_request(destination): - ret = yield self.transport_layer.make_membership_event( + async def send_request(destination: str) -> Tuple[str, EventBase, RoomVersion]: + ret = await self.transport_layer.make_membership_event( destination, room_id, user_id, membership, params ) @@ -492,88 +507,83 @@ class FederationClient(FederationBase): event_dict=pdu_dict, ) - return (destination, ev, room_version) + return destination, ev, room_version - return self._try_destination_list( + return await self._try_destination_list( "make_" + membership, destinations, send_request ) - def send_join(self, destinations, pdu, event_format_version): + async def send_join( + self, destinations: Iterable[str], pdu: EventBase, room_version: RoomVersion + ) -> Dict[str, Any]: """Sends a join event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, and send the event out to the rest of the federation. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent - event_format_version (int): The event format version + pdu: event to be sent + room_version: the version of the room (according to the server that + did the make_join) - Return: - Deferred: resolves to a dict with members ``origin`` (a string - giving the serer the event was sent to, ``state`` (?) and + Returns: + a dict with members ``origin`` (a string + giving the server the event was sent to, ``state`` (?) and ``auth_chain``. - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError: if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError: if no servers were reachable. """ - def check_authchain_validity(signed_auth_chain): - for e in signed_auth_chain: - if e.type == EventTypes.Create: - create_event = e - break - else: - raise InvalidResponseError("no %s in auth chain" % (EventTypes.Create,)) - - # the room version should be sane. - room_version = create_event.content.get("room_version", "1") - if room_version not in KNOWN_ROOM_VERSIONS: - # This shouldn't be possible, because the remote server should have - # rejected the join attempt during make_join. - raise InvalidResponseError( - "room appears to have unsupported version %s" % (room_version,) - ) - - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_join(destination, pdu) + async def send_request(destination) -> Dict[str, Any]: + content = await self._do_send_join(destination, pdu) logger.debug("Got content: %s", content) state = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("state", []) ] auth_chain = [ - event_from_pdu_json(p, event_format_version, outlier=True) + event_from_pdu_json(p, room_version, outlier=True) for p in content.get("auth_chain", []) ] pdus = {p.event_id: p for p in itertools.chain(state, auth_chain)} - room_version = None + create_event = None for e in state: if (e.type, e.state_key) == (EventTypes.Create, ""): - room_version = e.content.get( - "room_version", RoomVersions.V1.identifier - ) + create_event = e break - if room_version is None: + if create_event is None: # If the state doesn't have a create event then the room is # invalid, and it would fail auth checks anyway. raise SynapseError(400, "No create event in state") - valid_pdus = yield self._check_sigs_and_hash_and_fetch( + # the room version should be sane. + create_room_version = create_event.content.get( + "room_version", RoomVersions.V1.identifier + ) + if create_room_version != room_version.identifier: + # either the server that fulfilled the make_join, or the server that is + # handling the send_join, is lying. + raise InvalidResponseError( + "Unexpected room version %s in create event" + % (create_room_version,) + ) + + valid_pdus = await self._check_sigs_and_hash_and_fetch( destination, list(pdus.values()), outlier=True, - room_version=room_version, + room_version=room_version.identifier, ) valid_pdus_map = {p.event_id: p for p in valid_pdus} @@ -597,7 +607,17 @@ class FederationClient(FederationBase): for s in signed_state: s.internal_metadata = copy.deepcopy(s.internal_metadata) - check_authchain_validity(signed_auth) + # double-check that the same create event has ended up in the auth chain + auth_chain_create_events = [ + e.event_id + for e in signed_auth + if (e.type, e.state_key) == (EventTypes.Create, "") + ] + if auth_chain_create_events != [create_event.event_id]: + raise InvalidResponseError( + "Unexpected create event(s) in auth chain" + % (auth_chain_create_events,) + ) return { "state": signed_state, @@ -605,14 +625,13 @@ class FederationClient(FederationBase): "origin": destination, } - return self._try_destination_list("send_join", destinations, send_request) + return await self._try_destination_list("send_join", destinations, send_request) - @defer.inlineCallbacks - def _do_send_join(self, destination, pdu): + async def _do_send_join(self, destination: str, pdu: EventBase): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_join_v2( + content = await self.transport_layer.send_join_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -634,7 +653,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_join_v1( + resp = await self.transport_layer.send_join_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -645,51 +664,45 @@ class FederationClient(FederationBase): # content. return resp[1] - @defer.inlineCallbacks - def send_invite(self, destination, room_id, event_id, pdu): - room_version = yield self.store.get_room_version_id(room_id) + async def send_invite( + self, destination: str, room_id: str, event_id: str, pdu: EventBase, + ) -> EventBase: + room_version = await self.store.get_room_version(room_id) - content = yield self._do_send_invite(destination, pdu, room_version) + content = await self._do_send_invite(destination, pdu, room_version) pdu_dict = content["event"] logger.debug("Got response to send_invite: %s", pdu_dict) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(pdu_dict, format_ver) + pdu = event_from_pdu_json(pdu_dict, room_version) # Check signatures are correct. - pdu = yield self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) # FIXME: We should handle signature failures more gracefully. return pdu - @defer.inlineCallbacks - def _do_send_invite(self, destination, pdu, room_version): + async def _do_send_invite( + self, destination: str, pdu: EventBase, room_version: RoomVersion + ) -> JsonDict: """Actually sends the invite, first trying v2 API and falling back to v1 API if necessary. - Args: - destination (str): Target server - pdu (FrozenEvent) - room_version (str) - Returns: - dict: The event as a dict as returned by the remote server + The event as a dict as returned by the remote server """ time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_invite_v2( + content = await self.transport_layer.send_invite_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, content={ "event": pdu.get_pdu_json(time_now), - "room_version": room_version, + "room_version": room_version.identifier, "invite_room_state": pdu.unsigned.get("invite_room_state", []), }, ) @@ -707,8 +720,7 @@ class FederationClient(FederationBase): # Otherwise, we assume that the remote server doesn't understand # the v2 invite API. That's ok provided the room uses old-style event # IDs. - v = KNOWN_ROOM_VERSIONS.get(room_version) - if v.event_format != EventFormatVersions.V1: + if room_version.event_format != EventFormatVersions.V1: raise SynapseError( 400, "User's homeserver does not support this room version", @@ -722,7 +734,7 @@ class FederationClient(FederationBase): # Didn't work, try v1 API. # Note the v1 API returns a tuple of `(200, content)` - _, content = yield self.transport_layer.send_invite_v1( + _, content = await self.transport_layer.send_invite_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -730,7 +742,7 @@ class FederationClient(FederationBase): ) return content - def send_leave(self, destinations, pdu): + async def send_leave(self, destinations: Iterable[str], pdu: EventBase) -> None: """Sends a leave event to one of a list of homeservers. Doing so will cause the remote server to add the event to the graph, @@ -739,34 +751,29 @@ class FederationClient(FederationBase): This is mostly useful to reject received invites. Args: - destinations (str): Candidate homeservers which are probably + destinations: Candidate homeservers which are probably participating in the room. - pdu (BaseEvent): event to be sent + pdu: event to be sent - Return: - Deferred: resolves to None. - - Fails with a ``SynapseError`` if the chosen remote server - returns a 300/400 code. + Raises: + SynapseError if the chosen remote server returns a 300/400 code. - Fails with a ``RuntimeError`` if no servers were reachable. + RuntimeError if no servers were reachable. """ - @defer.inlineCallbacks - def send_request(destination): - content = yield self._do_send_leave(destination, pdu) - + async def send_request(destination: str) -> None: + content = await self._do_send_leave(destination, pdu) logger.debug("Got content: %s", content) - return None - return self._try_destination_list("send_leave", destinations, send_request) + return await self._try_destination_list( + "send_leave", destinations, send_request + ) - @defer.inlineCallbacks - def _do_send_leave(self, destination, pdu): + async def _do_send_leave(self, destination, pdu): time_now = self._clock.time_msec() try: - content = yield self.transport_layer.send_leave_v2( + content = await self.transport_layer.send_leave_v2( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -788,7 +795,7 @@ class FederationClient(FederationBase): logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API") - resp = yield self.transport_layer.send_leave_v1( + resp = await self.transport_layer.send_leave_v1( destination=destination, room_id=pdu.room_id, event_id=pdu.event_id, @@ -820,34 +827,33 @@ class FederationClient(FederationBase): third_party_instance_id=third_party_instance_id, ) - @defer.inlineCallbacks - def get_missing_events( + async def get_missing_events( self, - destination, - room_id, - earliest_events_ids, - latest_events, - limit, - min_depth, - timeout, - ): + destination: str, + room_id: str, + earliest_events_ids: Sequence[str], + latest_events: Iterable[EventBase], + limit: int, + min_depth: int, + timeout: int, + ) -> List[EventBase]: """Tries to fetch events we are missing. This is called when we receive an event without having received all of its ancestors. Args: - destination (str) - room_id (str) - earliest_events_ids (list): List of event ids. Effectively the + destination + room_id + earliest_events_ids: List of event ids. Effectively the events we expected to receive, but haven't. `get_missing_events` should only return events that didn't happen before these. - latest_events (list): List of events we have received that we don't + latest_events: List of events we have received that we don't have all previous events for. - limit (int): Maximum number of events to return. - min_depth (int): Minimum depth of events tor return. - timeout (int): Max time to wait in ms + limit: Maximum number of events to return. + min_depth: Minimum depth of events to return. + timeout: Max time to wait in ms """ try: - content = yield self.transport_layer.get_missing_events( + content = await self.transport_layer.get_missing_events( destination=destination, room_id=room_id, earliest_events=earliest_events_ids, @@ -857,15 +863,14 @@ class FederationClient(FederationBase): timeout=timeout, ) - room_version = yield self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) events = [ - event_from_pdu_json(e, format_ver) for e in content.get("events", []) + event_from_pdu_json(e, room_version) for e in content.get("events", []) ] - signed_events = yield self._check_sigs_and_hash_and_fetch( - destination, events, outlier=False, room_version=room_version + signed_events = await self._check_sigs_and_hash_and_fetch( + destination, events, outlier=False, room_version=room_version.identifier ) except HttpResponseException as e: if not e.code == 400: diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index b3e4db507e..7f9da49326 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -38,7 +38,6 @@ from synapse.api.errors import ( UnsupportedRoomVersionError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS -from synapse.events import room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction @@ -54,7 +53,7 @@ from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, ) -from synapse.types import get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util import glob_to_regex, unwrapFirstError from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache @@ -236,24 +235,17 @@ class FederationServer(FederationBase): continue try: - room_version = await self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version(room_id) except NotFoundError: logger.info("Ignoring PDU for unknown room_id: %s", room_id) continue - - try: - format_ver = room_version_to_event_format(room_version) - except UnsupportedRoomVersionError: + except UnsupportedRoomVersionError as e: # this can happen if support for a given room version is withdrawn, # so that we still get events for said room. - logger.info( - "Ignoring PDU for room %s with unknown version %s", - room_id, - room_version, - ) + logger.info("Ignoring PDU: %s", e) continue - event = event_from_pdu_json(p, format_ver) + event = event_from_pdu_json(p, room_version) pdus_by_room.setdefault(room_id, []).append(event) pdu_results = {} @@ -304,7 +296,12 @@ class FederationServer(FederationBase): async def _process_edu(edu_dict): received_edus_counter.inc() - edu = Edu(**edu_dict) + edu = Edu( + origin=origin, + destination=self.server_name, + edu_type=edu_dict["edu_type"], + content=edu_dict["content"], + ) await self.registry.on_edu(edu.edu_type, origin, edu.content) await concurrently_execute( @@ -398,20 +395,21 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} - async def on_invite_request(self, origin, content, room_version): - if room_version not in KNOWN_ROOM_VERSIONS: + async def on_invite_request( + self, origin: str, content: JsonDict, room_version_id: str + ): + room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) + if not room_version: raise SynapseError( 400, "Homeserver does not support this room version", Codes.UNSUPPORTED_ROOM_VERSION, ) - format_ver = room_version_to_event_format(room_version) - - pdu = event_from_pdu_json(content, format_ver) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version) time_now = self._clock.time_msec() return {"event": ret_pdu.get_pdu_json(time_now)} @@ -419,16 +417,15 @@ class FederationServer(FederationBase): async def on_send_join_request(self, origin, content, room_id): logger.debug("on_send_join_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -450,16 +447,15 @@ class FederationServer(FederationBase): async def on_send_leave_request(self, origin, content, room_id): logger.debug("on_send_leave_request: content: %s", content) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) - pdu = event_from_pdu_json(content, format_ver) + room_version = await self.store.get_room_version(room_id) + pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version, pdu) + pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) await self.handler.on_send_leave_request(origin, pdu) return {} @@ -497,15 +493,14 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - room_version = await self.store.get_room_version_id(room_id) - format_ver = room_version_to_event_format(room_version) + room_version = await self.store.get_room_version(room_id) auth_chain = [ - event_from_pdu_json(e, format_ver) for e in content["auth_chain"] + event_from_pdu_json(e, room_version) for e in content["auth_chain"] ] signed_auth = await self._check_sigs_and_hash_and_fetch( - origin, auth_chain, outlier=True, room_version=room_version + origin, auth_chain, outlier=True, room_version=room_version.identifier ) ret = await self.handler.on_query_auth( @@ -573,7 +568,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) - logger.info( + logger.debug( "on_get_missing_events: earliest_events: %r, latest_events: %r," " limit: %d", earliest_events, @@ -586,11 +581,11 @@ class FederationServer(FederationBase): ) if len(missing_events) < 5: - logger.info( + logger.debug( "Returning %d events: %r", len(missing_events), missing_events ) else: - logger.info("Returning %d events", len(missing_events)) + logger.debug("Returning %d events", len(missing_events)) time_now = self._clock.time_msec() diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index edf5d265e6..8b8d67f7bf 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,6 +14,7 @@ # limitations under the License. import logging +from typing import Dict, Hashable, Iterable, List, Optional, Set from six import itervalues @@ -23,6 +24,7 @@ from twisted.internet import defer import synapse import synapse.metrics +from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu @@ -39,6 +41,8 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage.presence import UserPresenceState +from synapse.types import ReadReceipt from synapse.util.metrics import Measure, measure_func logger = logging.getLogger(__name__) @@ -68,7 +72,7 @@ class FederationSender(object): self._transaction_manager = TransactionManager(hs) # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: dict[str, PerDestinationQueue] + self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] LaterGauge( "synapse_federation_transaction_queue_pending_destinations", @@ -84,7 +88,7 @@ class FederationSender(object): # Map of user_id -> UserPresenceState for all the pending presence # to be sent out by user_id. Entries here get processed and put in # pending_presence_by_dest - self.pending_presence = {} + self.pending_presence = {} # type: Dict[str, UserPresenceState] LaterGauge( "synapse_federation_transaction_queue_pending_pdus", @@ -116,20 +120,17 @@ class FederationSender(object): # and that there is a pending call to _flush_rrs_for_room in the system. self._queues_awaiting_rr_flush_by_room = ( {} - ) # type: dict[str, set[PerDestinationQueue]] + ) # type: Dict[str, Set[PerDestinationQueue]] self._rr_txn_interval_per_room_ms = ( - 1000.0 / hs.get_config().federation_rr_transactions_per_room_per_second + 1000.0 / hs.config.federation_rr_transactions_per_room_per_second ) - def _get_per_destination_queue(self, destination): + def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination Args: - destination (str): server_name of remote server - - Returns: - PerDestinationQueue + destination: server_name of remote server """ queue = self._per_destination_queues.get(destination) if not queue: @@ -137,7 +138,7 @@ class FederationSender(object): self._per_destination_queues[destination] = queue return queue - def notify_new_events(self, current_id): + def notify_new_events(self, current_id: int) -> None: """This gets called when we have some new events we might want to send out to other servers. """ @@ -151,8 +152,7 @@ class FederationSender(object): "process_event_queue_for_federation", self._process_event_queue_loop ) - @defer.inlineCallbacks - def _process_event_queue_loop(self): + async def _process_event_queue_loop(self): loop_start_time = self.clock.time_msec() try: self._is_processing = True @@ -171,8 +171,8 @@ class FederationSender(object): ) self._transaction_manager.deprioritise_transmission = True - last_token = yield self.store.get_federation_out_pos("events") - next_token, events = yield self.store.get_all_new_events_stream( + last_token = await self.store.get_federation_out_pos("events") + next_token, events = await self.store.get_all_new_events_stream( last_token, self._last_poked_id, limit=100 ) @@ -181,8 +181,7 @@ class FederationSender(object): if not events and next_token >= self._last_poked_id: break - @defer.inlineCallbacks - def handle_event(event): + async def handle_event(event: EventBase) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) @@ -199,7 +198,7 @@ class FederationSender(object): # Otherwise if the last member on a server in a room is # banned then it won't receive the event because it won't # be in the room after the ban. - destinations = yield self.state.get_hosts_in_room_at_events( + destinations = await self.state.get_hosts_in_room_at_events( event.room_id, event_ids=event.prev_event_ids() ) except Exception: @@ -221,17 +220,16 @@ class FederationSender(object): self._send_pdu(event, destinations) - @defer.inlineCallbacks - def handle_room_events(events): + async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): for event in events: - yield handle_event(event) + await handle_event(event) - events_by_room = {} + events_by_room = {} # type: Dict[str, List[EventBase]] for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield make_deferred_yieldable( + await make_deferred_yieldable( defer.gatherResults( [ run_in_background(handle_room_events, evs) @@ -241,11 +239,11 @@ class FederationSender(object): ) ) - yield self.store.update_federation_out_pos("events", next_token) + await self.store.update_federation_out_pos("events", next_token) if events: now = self.clock.time_msec() - ts = yield self.store.get_received_ts(events[-1].event_id) + ts = await self.store.get_received_ts(events[-1].event_id) synapse.metrics.event_processing_lag.labels( "federation_sender" @@ -272,7 +270,7 @@ class FederationSender(object): logger.info("Event queue caught up: re-prioritising transmission") self._transaction_manager.deprioritise_transmission = False - def _send_pdu(self, pdu, destinations): + def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -294,11 +292,11 @@ class FederationSender(object): self._get_per_destination_queue(destination).send_pdu(pdu, order) @defer.inlineCallbacks - def send_read_receipt(self, receipt): + def send_read_receipt(self, receipt: ReadReceipt): """Send a RR to any other servers in the room Args: - receipt (synapse.types.ReadReceipt): receipt to be sent + receipt: receipt to be sent """ # Some background on the rate-limiting going on here. @@ -361,7 +359,7 @@ class FederationSender(object): else: queue.flush_read_receipts_for_room(room_id) - def _schedule_rr_flush_for_room(self, room_id, n_domains): + def _schedule_rr_flush_for_room(self, room_id: str, n_domains: int) -> None: # that is going to cause approximately len(domains) transactions, so now back # off for that multiplied by RR_TXN_INTERVAL_PER_ROOM backoff_ms = self._rr_txn_interval_per_room_ms * n_domains @@ -370,7 +368,7 @@ class FederationSender(object): self.clock.call_later(backoff_ms, self._flush_rrs_for_room, room_id) self._queues_awaiting_rr_flush_by_room[room_id] = set() - def _flush_rrs_for_room(self, room_id): + def _flush_rrs_for_room(self, room_id: str) -> None: queues = self._queues_awaiting_rr_flush_by_room.pop(room_id) logger.debug("Flushing RRs in %s to %s", room_id, queues) @@ -386,14 +384,11 @@ class FederationSender(object): @preserve_fn # the caller should not yield on this @defer.inlineCallbacks - def send_presence(self, states): + def send_presence(self, states: List[UserPresenceState]): """Send the new presence states to the appropriate destinations. This actually queues up the presence states ready for sending and triggers a background task to process them and send out the transactions. - - Args: - states (list(UserPresenceState)) """ if not self.hs.config.use_presence: # No-op if presence is disabled. @@ -430,11 +425,10 @@ class FederationSender(object): finally: self._processing_pending_presence = False - def send_presence_to_destinations(self, states, destinations): + def send_presence_to_destinations( + self, states: List[UserPresenceState], destinations: List[str] + ) -> None: """Send the given presence states to the given destinations. - - Args: - states (list[UserPresenceState]) destinations (list[str]) """ @@ -449,12 +443,9 @@ class FederationSender(object): @measure_func("txnqueue._process_presence") @defer.inlineCallbacks - def _process_presence_inner(self, states): + def _process_presence_inner(self, states: List[UserPresenceState]): """Given a list of states populate self.pending_presence_by_dest and poke to send a new transaction to each destination - - Args: - states (list(UserPresenceState)) """ hosts_and_states = yield get_interested_remotes(self.store, states, self.state) @@ -464,14 +455,20 @@ class FederationSender(object): continue self._get_per_destination_queue(destination).send_presence(states) - def build_and_send_edu(self, destination, edu_type, content, key=None): + def build_and_send_edu( + self, + destination: str, + edu_type: str, + content: dict, + key: Optional[Hashable] = None, + ): """Construct an Edu object, and queue it for sending Args: - destination (str): name of server to send to - edu_type (str): type of EDU to send - content (dict): content of EDU - key (Any|None): clobbering key for this edu + destination: name of server to send to + edu_type: type of EDU to send + content: content of EDU + key: clobbering key for this edu """ if destination == self.server_name: logger.info("Not sending EDU to ourselves") @@ -486,12 +483,12 @@ class FederationSender(object): self.send_edu(edu, key) - def send_edu(self, edu, key): + def send_edu(self, edu: Edu, key: Optional[Hashable]): """Queue an EDU for sending Args: - edu (Edu): edu to send - key (Any|None): clobbering key for this edu + edu: edu to send + key: clobbering key for this edu """ queue = self._get_per_destination_queue(edu.destination) if key: @@ -499,7 +496,7 @@ class FederationSender(object): else: queue.send_edu(edu) - def send_device_messages(self, destination): + def send_device_messages(self, destination: str): if destination == self.server_name: logger.warning("Not sending device update to ourselves") return @@ -519,5 +516,5 @@ class FederationSender(object): self._get_per_destination_queue(destination).attempt_new_transaction() - def get_current_token(self): + def get_current_token(self) -> int: return 0 diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 74f81d8260..c6f8e5ba1e 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -16,11 +16,11 @@ import datetime import logging import random +from typing import Dict, Hashable, Iterable, List, Tuple from prometheus_client import Counter -from twisted.internet import defer - +import synapse.server from synapse.api.errors import ( FederationDeniedError, HttpResponseException, @@ -32,7 +32,7 @@ from synapse.handlers.presence import format_user_presence_state from synapse.metrics import sent_transactions_counter from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState -from synapse.types import StateMap +from synapse.types import ReadReceipt from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter # This is defined in the Matrix spec and enforced by the receiver. @@ -59,13 +59,18 @@ class PerDestinationQueue(object): Manages the per-destination transmission queues. Args: - hs (synapse.HomeServer): - transaction_sender (TransactionManager): - destination (str): the server_name of the destination that we are managing + hs + transaction_sender + destination: the server_name of the destination that we are managing transmission for. """ - def __init__(self, hs, transaction_manager, destination): + def __init__( + self, + hs: "synapse.server.HomeServer", + transaction_manager: "synapse.federation.sender.TransactionManager", + destination: str, + ): self._server_name = hs.hostname self._clock = hs.get_clock() self._store = hs.get_datastore() @@ -75,20 +80,20 @@ class PerDestinationQueue(object): self.transmission_loop_running = False # a list of tuples of (pending pdu, order) - self._pending_pdus = [] # type: list[tuple[EventBase, int]] - self._pending_edus = [] # type: list[Edu] + self._pending_pdus = [] # type: List[Tuple[EventBase, int]] + self._pending_edus = [] # type: List[Edu] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: StateMap[Edu] + self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu] # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: dict[str, UserPresenceState] + self._pending_presence = {} # type: Dict[str, UserPresenceState] # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs = {} + self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]] self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -98,50 +103,50 @@ class PerDestinationQueue(object): # stream_id of last successfully sent device list update. self._last_device_list_stream_id = 0 - def __str__(self): + def __str__(self) -> str: return "PerDestinationQueue[%s]" % self._destination - def pending_pdu_count(self): + def pending_pdu_count(self) -> int: return len(self._pending_pdus) - def pending_edu_count(self): + def pending_edu_count(self) -> int: return ( len(self._pending_edus) + len(self._pending_presence) + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu, order): + def send_pdu(self, pdu: EventBase, order: int) -> None: """Add a PDU to the queue, and start the transmission loop if neccessary Args: - pdu (EventBase): pdu to send - order (int): + pdu: pdu to send + order """ self._pending_pdus.append((pdu, order)) self.attempt_new_transaction() - def send_presence(self, states): + def send_presence(self, states: Iterable[UserPresenceState]) -> None: """Add presence updates to the queue. Start the transmission loop if neccessary. Args: - states (iterable[UserPresenceState]): presence to send + states: presence to send """ self._pending_presence.update({state.user_id: state for state in states}) self.attempt_new_transaction() - def queue_read_receipt(self, receipt): + def queue_read_receipt(self, receipt: ReadReceipt) -> None: """Add a RR to the list to be sent. Doesn't start the transmission loop yet (see flush_read_receipts_for_room) Args: - receipt (synapse.api.receipt_info.ReceiptInfo): receipt to be queued + receipt: receipt to be queued """ self._pending_rrs.setdefault(receipt.room_id, {}).setdefault( receipt.receipt_type, {} )[receipt.user_id] = {"event_ids": receipt.event_ids, "data": receipt.data} - def flush_read_receipts_for_room(self, room_id): + def flush_read_receipts_for_room(self, room_id: str) -> None: # if we don't have any read-receipts for this room, it may be that we've already # sent them out, so we don't need to flush. if room_id not in self._pending_rrs: @@ -149,15 +154,15 @@ class PerDestinationQueue(object): self._rrs_pending_flush = True self.attempt_new_transaction() - def send_keyed_edu(self, edu, key): + def send_keyed_edu(self, edu: Edu, key: Hashable) -> None: self._pending_edus_keyed[(edu.edu_type, key)] = edu self.attempt_new_transaction() - def send_edu(self, edu): + def send_edu(self, edu) -> None: self._pending_edus.append(edu) self.attempt_new_transaction() - def attempt_new_transaction(self): + def attempt_new_transaction(self) -> None: """Try to start a new transaction to this destination If there is already a transaction in progress to this destination, @@ -180,16 +185,15 @@ class PerDestinationQueue(object): self._transaction_transmission_loop, ) - @defer.inlineCallbacks - def _transaction_transmission_loop(self): - pending_pdus = [] + async def _transaction_transmission_loop(self) -> None: + pending_pdus = [] # type: List[Tuple[EventBase, int]] try: self.transmission_loop_running = True # This will throw if we wouldn't retry. We do this here so we fail # quickly, but we will later check this again in the http client, # hence why we throw the result away. - yield get_retry_limiter(self._destination, self._clock, self._store) + await get_retry_limiter(self._destination, self._clock, self._store) pending_pdus = [] while True: @@ -203,12 +207,12 @@ class PerDestinationQueue(object): logger.info( "TX [%s]: sleeping for %f seconds", self._destination, sleeptime ) - yield self._clock.sleep(sleeptime) + await self._clock.sleep(sleeptime) # We have to keep 2 free slots for presence and rr_edus limit = MAX_EDUS_PER_TRANSACTION - 2 - device_update_edus, dev_list_id = yield self._get_device_update_edus( + device_update_edus, dev_list_id = await self._get_device_update_edus( limit ) @@ -217,7 +221,7 @@ class PerDestinationQueue(object): ( to_device_edus, device_stream_id, - ) = yield self._get_to_device_message_edus(limit) + ) = await self._get_to_device_message_edus(limit) pending_edus = device_update_edus + to_device_edus @@ -284,7 +288,7 @@ class PerDestinationQueue(object): # END CRITICAL SECTION - success = yield self._transaction_manager.send_new_transaction( + success = await self._transaction_manager.send_new_transaction( self._destination, pending_pdus, pending_edus ) if success: @@ -295,7 +299,7 @@ class PerDestinationQueue(object): # Remove the acknowledged device messages from the database # Only bother if we actually sent some device messages if to_device_edus: - yield self._store.delete_device_msgs_for_remote( + await self._store.delete_device_msgs_for_remote( self._destination, device_stream_id ) @@ -304,7 +308,7 @@ class PerDestinationQueue(object): logger.info( "Marking as sent %r %r", self._destination, dev_list_id ) - yield self._store.mark_as_sent_devices_by_remote( + await self._store.mark_as_sent_devices_by_remote( self._destination, dev_list_id ) @@ -349,7 +353,7 @@ class PerDestinationQueue(object): # We want to be *very* sure we clear this after we stop processing self.transmission_loop_running = False - def _get_rr_edus(self, force_flush): + def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]: if not self._pending_rrs: return if not force_flush and not self._rrs_pending_flush: @@ -366,17 +370,16 @@ class PerDestinationQueue(object): self._rrs_pending_flush = False yield edu - def _pop_pending_edus(self, limit): + def _pop_pending_edus(self, limit: int) -> List[Edu]: pending_edus = self._pending_edus pending_edus, self._pending_edus = pending_edus[:limit], pending_edus[limit:] return pending_edus - @defer.inlineCallbacks - def _get_device_update_edus(self, limit): + async def _get_device_update_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_list = self._last_device_list_stream_id # Retrieve list of new device updates to send to the destination - now_stream_id, results = yield self._store.get_device_updates_by_remote( + now_stream_id, results = await self._store.get_device_updates_by_remote( self._destination, last_device_list, limit=limit ) edus = [ @@ -393,11 +396,10 @@ class PerDestinationQueue(object): return (edus, now_stream_id) - @defer.inlineCallbacks - def _get_to_device_message_edus(self, limit): + async def _get_to_device_message_edus(self, limit: int) -> Tuple[List[Edu], int]: last_device_stream_id = self._last_device_stream_id to_device_stream_id = self._store.get_to_device_stream_token() - contents, stream_id = yield self._store.get_new_device_msgs_for_remote( + contents, stream_id = await self._store.get_new_device_msgs_for_remote( self._destination, last_device_stream_id, to_device_stream_id, limit ) edus = [ diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index ca558fa242..7d3cc4a6c2 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -13,14 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import List from canonicaljson import json -from twisted.internet import defer - +import synapse.server from synapse.api.errors import HttpResponseException +from synapse.events import EventBase from synapse.federation.persistence import TransactionActions -from synapse.federation.units import Transaction +from synapse.federation.units import Edu, Transaction from synapse.logging.opentracing import ( extract_text_map, set_tag, @@ -39,7 +40,7 @@ class TransactionManager(object): shared between PerDestinationQueue objects """ - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self._server_name = hs.hostname self.clock = hs.get_clock() # nb must be called this for @measure_func self._store = hs.get_datastore() @@ -54,8 +55,9 @@ class TransactionManager(object): self.deprioritise_transmission = False @measure_func("_send_new_transaction") - @defer.inlineCallbacks - def send_new_transaction(self, destination, pending_pdus, pending_edus): + async def send_new_transaction( + self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu] + ): # Make a transaction-sending opentracing span. This span follows on from # all the edus in that transaction. This needs to be done since there is @@ -131,7 +133,7 @@ class TransactionManager(object): return data try: - response = yield self._transport_layer.send_transaction( + response = await self._transport_layer.send_transaction( transaction, json_data_cb ) code = 200 diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 125eadd796..92a9ae2320 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -158,7 +158,7 @@ class Authenticator(object): origin, json_request, now, "Incoming request" ) - logger.info("Request from %s", origin) + logger.debug("Request from %s", origin) request.authenticated_entity = origin # If we get a valid signed request from the other side, its probably @@ -579,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = await self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1.identifier + origin, content, room_version_id=RoomVersions.V1.identifier ) # V1 federation API is defined to return a content of `[200, {...}]` @@ -606,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet): event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state content = await self.handler.on_invite_request( - origin, event, room_version=room_version + origin, event, room_version_id=room_version ) return 200, content diff --git a/synapse/federation/units.py b/synapse/federation/units.py index b4d743cde7..6b32e0dcbf 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -19,11 +19,15 @@ server protocol. import logging +import attr + +from synapse.types import JsonDict from synapse.util.jsonobject import JsonEncodedObject logger = logging.getLogger(__name__) +@attr.s(slots=True) class Edu(JsonEncodedObject): """ An Edu represents a piece of data sent from one homeserver to another. @@ -32,11 +36,24 @@ class Edu(JsonEncodedObject): internal ID or previous references graph. """ - valid_keys = ["origin", "destination", "edu_type", "content"] + edu_type = attr.ib(type=str) + content = attr.ib(type=dict) + origin = attr.ib(type=str) + destination = attr.ib(type=str) - required_keys = ["edu_type"] + def get_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + } - internal_keys = ["origin", "destination"] + def get_internal_dict(self) -> JsonDict: + return { + "edu_type": self.edu_type, + "content": self.content, + "origin": self.origin, + "destination": self.destination, + } def get_context(self): return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 9205865231..f3c0aeceb6 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -58,8 +58,10 @@ class AdminHandler(BaseHandler): ret = await self.store.get_user_by_id(user.to_string()) if ret: profile = await self.store.get_profileinfo(user.localpart) + threepids = await self.store.user_get_threepids(user.to_string()) ret["displayname"] = profile.display_name ret["avatar_url"] = profile.avatar_url + ret["threepids"] = threepids return ret async def export_user_data(self, user_id, writer): diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 54a71c49d2..48a88d3c2a 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -816,6 +816,14 @@ class AuthHandler(BaseHandler): @defer.inlineCallbacks def add_threepid(self, user_id, medium, address, validated_at): + # check if medium has a valid value + if medium not in ["email", "msisdn"]: + raise SynapseError( + code=400, + msg=("'%s' is not a valid value for 'medium'" % (medium,)), + errcode=Codes.INVALID_PARAM, + ) + # 'Canonicalise' email addresses down to lower case. # We've now moving towards the homeserver being the entity that # is responsible for validating threepids used for resetting passwords diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 6d8e48ed39..50cea3f378 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -26,6 +26,7 @@ from synapse.api.errors import ( FederationDeniedError, HttpResponseException, RequestSendFailed, + SynapseError, ) from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.types import RoomStreamToken, get_domain_from_id @@ -39,6 +40,8 @@ from ._base import BaseHandler logger = logging.getLogger(__name__) +MAX_DEVICE_DISPLAY_NAME_LEN = 100 + class DeviceWorkerHandler(BaseHandler): def __init__(self, hs): @@ -404,9 +407,18 @@ class DeviceHandler(DeviceWorkerHandler): defer.Deferred: """ + # Reject a new displayname which is too long. + new_display_name = content.get("display_name") + if new_display_name and len(new_display_name) > MAX_DEVICE_DISPLAY_NAME_LEN: + raise SynapseError( + 400, + "Device display name is too long (max %i)" + % (MAX_DEVICE_DISPLAY_NAME_LEN,), + ) + try: yield self.store.update_device( - user_id, device_id, new_display_name=content.get("display_name") + user_id, device_id, new_display_name=new_display_name ) yield self.notify_device_update(user_id, [device_id]) except errors.StoreError as e: diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py index 8c5980cb0c..f718388884 100644 --- a/synapse/handlers/directory.py +++ b/synapse/handlers/directory.py @@ -81,13 +81,7 @@ class DirectoryHandler(BaseHandler): @defer.inlineCallbacks def create_association( - self, - requester, - room_alias, - room_id, - servers=None, - send_event=True, - check_membership=True, + self, requester, room_alias, room_id, servers=None, check_membership=True, ): """Attempt to create a new alias @@ -97,7 +91,6 @@ class DirectoryHandler(BaseHandler): room_id (str) servers (list[str]|None): List of servers that others servers should try and join via - send_event (bool): Whether to send an updated m.room.aliases event check_membership (bool): Whether to check if the user is in the room before the alias can be set (if the server's config requires it). @@ -150,16 +143,9 @@ class DirectoryHandler(BaseHandler): ) yield self._create_association(room_alias, room_id, servers, creator=user_id) - if send_event: - try: - yield self.send_room_alias_update_event(requester, room_id) - except AuthError as e: - # sending the aliases event may fail due to the user not having - # permission in the room; this is permitted. - logger.info("Skipping updating aliases event due to auth error %s", e) @defer.inlineCallbacks - def delete_association(self, requester, room_alias, send_event=True): + def delete_association(self, requester, room_alias): """Remove an alias from the directory (this is only meant for human users; AS users should call @@ -168,9 +154,6 @@ class DirectoryHandler(BaseHandler): Args: requester (Requester): room_alias (RoomAlias): - send_event (bool): Whether to send an updated m.room.aliases event. - Note that, if we delete the canonical alias, we will always attempt - to send an m.room.canonical_alias event Returns: Deferred[unicode]: room id that the alias used to point to @@ -206,9 +189,6 @@ class DirectoryHandler(BaseHandler): room_id = yield self._delete_association(room_alias) try: - if send_event: - yield self.send_room_alias_update_event(requester, room_id) - yield self._update_canonical_alias( requester, requester.user.to_string(), room_id, room_alias ) @@ -319,25 +299,50 @@ class DirectoryHandler(BaseHandler): @defer.inlineCallbacks def _update_canonical_alias(self, requester, user_id, room_id, room_alias): + """ + Send an updated canonical alias event if the removed alias was set as + the canonical alias or listed in the alt_aliases field. + """ alias_event = yield self.state.get_current_state( room_id, EventTypes.CanonicalAlias, "" ) - alias_str = room_alias.to_string() - if not alias_event or alias_event.content.get("alias", "") != alias_str: + # There is no canonical alias, nothing to do. + if not alias_event: return - yield self.event_creation_handler.create_and_send_nonmember_event( - requester, - { - "type": EventTypes.CanonicalAlias, - "state_key": "", - "room_id": room_id, - "sender": user_id, - "content": {}, - }, - ratelimit=False, - ) + # Obtain a mutable version of the event content. + content = dict(alias_event.content) + send_update = False + + # Remove the alias property if it matches the removed alias. + alias_str = room_alias.to_string() + if alias_event.content.get("alias", "") == alias_str: + send_update = True + content.pop("alias", "") + + # Filter alt_aliases for the removed alias. + alt_aliases = content.pop("alt_aliases", None) + # If the aliases are not a list (or not found) do not attempt to modify + # the list. + if isinstance(alt_aliases, list): + send_update = True + alt_aliases = [alias for alias in alt_aliases if alias != alias_str] + if alt_aliases: + content["alt_aliases"] = alt_aliases + + if send_update: + yield self.event_creation_handler.create_and_send_nonmember_event( + requester, + { + "type": EventTypes.CanonicalAlias, + "state_key": "", + "room_id": room_id, + "sender": user_id, + "content": content, + }, + ratelimit=False, + ) @defer.inlineCallbacks def get_association_from_room_alias(self, room_alias): diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index e9441bbeff..eb20ef4aec 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -65,7 +65,7 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour -from synapse.types import StateMap, UserID, get_domain_from_id +from synapse.types import JsonDict, StateMap, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.distributor import user_joined_room from synapse.util.retryutils import NotRetryingDestination @@ -1156,7 +1156,7 @@ class FederationHandler(BaseHandler): Logs a warning if we can't find the given event. """ - room_version = await self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version(room_id) event_infos = [] @@ -1230,13 +1230,12 @@ class FederationHandler(BaseHandler): ) raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events") - @defer.inlineCallbacks - def send_invite(self, target_host, event): + async def send_invite(self, target_host, event): """ Sends the invite to the remote server for signing. Invites must be signed by the invitee's server before distribution. """ - pdu = yield self.federation_client.send_invite( + pdu = await self.federation_client.send_invite( destination=target_host, room_id=event.room_id, event_id=event.event_id, @@ -1245,17 +1244,16 @@ class FederationHandler(BaseHandler): return pdu - @defer.inlineCallbacks - def on_event_auth(self, event_id): - event = yield self.store.get_event(event_id) - auth = yield self.store.get_auth_chain( + async def on_event_auth(self, event_id: str) -> List[EventBase]: + event = await self.store.get_event(event_id) + auth = await self.store.get_auth_chain( [auth_id for auth_id in event.auth_event_ids()], include_given=True ) - return [e for e in auth] + return list(auth) - @log_function - @defer.inlineCallbacks - def do_invite_join(self, target_hosts, room_id, joinee, content): + async def do_invite_join( + self, target_hosts: Iterable[str], room_id: str, joinee: str, content: JsonDict + ) -> None: """ Attempts to join the `joinee` to the room `room_id` via the servers contained in `target_hosts`. @@ -1268,17 +1266,17 @@ class FederationHandler(BaseHandler): have finished processing the join. Args: - target_hosts (Iterable[str]): List of servers to attempt to join the room with. + target_hosts: List of servers to attempt to join the room with. - room_id (str): The ID of the room to join. + room_id: The ID of the room to join. - joinee (str): The User ID of the joining user. + joinee: The User ID of the joining user. - content (dict): The event content to use for the join event. + content: The event content to use for the join event. """ logger.debug("Joining %s to %s", joinee, room_id) - origin, event, room_version_obj = yield self._make_and_verify_event( + origin, event, room_version_obj = await self._make_and_verify_event( target_hosts, room_id, joinee, @@ -1294,7 +1292,7 @@ class FederationHandler(BaseHandler): self.room_queues[room_id] = [] - yield self._clean_room_for_join(room_id) + await self._clean_room_for_join(room_id) handled_events = set() @@ -1307,9 +1305,8 @@ class FederationHandler(BaseHandler): except ValueError: pass - event_format_version = room_version_obj.event_format - ret = yield self.federation_client.send_join( - target_hosts, event, event_format_version + ret = await self.federation_client.send_join( + target_hosts, event, room_version_obj ) origin = ret["origin"] @@ -1327,7 +1324,7 @@ class FederationHandler(BaseHandler): logger.debug("do_invite_join event: %s", event) try: - yield self.store.store_room( + await self.store.store_room( room_id=room_id, room_creator_user_id="", is_public=False, @@ -1337,13 +1334,13 @@ class FederationHandler(BaseHandler): # FIXME pass - yield self._persist_auth_tree( + await self._persist_auth_tree( origin, auth_chain, state, event, room_version_obj ) # Check whether this room is the result of an upgrade of a room we already know # about. If so, migrate over user information - predecessor = yield self.store.get_room_predecessor(room_id) + predecessor = await self.store.get_room_predecessor(room_id) if not predecessor or not isinstance(predecessor.get("room_id"), str): return old_room_id = predecessor["room_id"] @@ -1353,7 +1350,7 @@ class FederationHandler(BaseHandler): # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() - yield member_handler.transfer_room_state_on_room_upgrade( + await member_handler.transfer_room_state_on_room_upgrade( old_room_id, room_id ) @@ -1370,8 +1367,6 @@ class FederationHandler(BaseHandler): run_in_background(self._handle_queued_pdus, room_queue) - return True - async def _handle_queued_pdus(self, room_queue): """Process PDUs which got queued up while we were busy send_joining. @@ -1394,20 +1389,17 @@ class FederationHandler(BaseHandler): "Error handling queued PDU %s from %s: %s", p.event_id, origin, e ) - @defer.inlineCallbacks - @log_function - def on_make_join_request(self, origin, room_id, user_id): + async def on_make_join_request( + self, origin: str, room_id: str, user_id: str + ) -> EventBase: """ We've received a /make_join/ request, so we create a partial join event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. Args: - origin (str): The (verified) server name of the requesting server. - room_id (str): Room to create join event in - user_id (str): The user to create the join for - - Returns: - Deferred[FrozenEvent] + origin: The (verified) server name of the requesting server. + room_id: Room to create join event in + user_id: The user to create the join for """ if get_domain_from_id(user_id) != origin: logger.info( @@ -1419,7 +1411,7 @@ class FederationHandler(BaseHandler): event_content = {"membership": Membership.JOIN} - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, @@ -1433,14 +1425,14 @@ class FederationHandler(BaseHandler): ) try: - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) except AuthError as e: logger.warning("Failed to create join to %s because %s", room_id, e) raise e - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1451,15 +1443,13 @@ class FederationHandler(BaseHandler): # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` - yield self.auth.check_from_context( + await self.auth.check_from_context( room_version, event, context, do_sig_check=False ) return event - @defer.inlineCallbacks - @log_function - def on_send_join_request(self, origin, pdu): + async def on_send_join_request(self, origin, pdu): """ We have received a join event for a room. Fully process it and respond with the current state and auth chains. """ @@ -1496,9 +1486,9 @@ class FederationHandler(BaseHandler): # would introduce the danger of backwards-compatibility problems. event.internal_metadata.send_on_behalf_of = origin - context = yield self._handle_new_event(origin, event) + context = await self._handle_new_event(origin, event) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1516,19 +1506,18 @@ class FederationHandler(BaseHandler): if event.type == EventTypes.Member: if event.content["membership"] == Membership.JOIN: user = UserID.from_string(event.state_key) - yield self.user_joined_room(user, event.room_id) + await self.user_joined_room(user, event.room_id) - prev_state_ids = yield context.get_prev_state_ids() + prev_state_ids = await context.get_prev_state_ids() state_ids = list(prev_state_ids.values()) - auth_chain = yield self.store.get_auth_chain(state_ids) + auth_chain = await self.store.get_auth_chain(state_ids) - state = yield self.store.get_events(list(prev_state_ids.values())) + state = await self.store.get_events(list(prev_state_ids.values())) return {"state": list(state.values()), "auth_chain": auth_chain} - @defer.inlineCallbacks - def on_invite_request( + async def on_invite_request( self, origin: str, event: EventBase, room_version: RoomVersion ): """ We've got an invite event. Process and persist it. Sign it. @@ -1538,7 +1527,7 @@ class FederationHandler(BaseHandler): if event.state_key is None: raise SynapseError(400, "The invite event did not have a state key") - is_blocked = yield self.store.is_room_blocked(event.room_id) + is_blocked = await self.store.is_room_blocked(event.room_id) if is_blocked: raise SynapseError(403, "This room has been blocked on this server") @@ -1581,14 +1570,15 @@ class FederationHandler(BaseHandler): ) ) - context = yield self.state_handler.compute_event_context(event) - yield self.persist_events_and_notify([(event, context)]) + context = await self.state_handler.compute_event_context(event) + await self.persist_events_and_notify([(event, context)]) return event - @defer.inlineCallbacks - def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content): - origin, event, room_version = yield self._make_and_verify_event( + async def do_remotely_reject_invite( + self, target_hosts: Iterable[str], room_id: str, user_id: str, content: JsonDict + ) -> EventBase: + origin, event, room_version = await self._make_and_verify_event( target_hosts, room_id, user_id, "leave", content=content ) # Mark as outlier as we don't have any state for this event; we're not @@ -1604,22 +1594,27 @@ class FederationHandler(BaseHandler): except ValueError: pass - yield self.federation_client.send_leave(target_hosts, event) + await self.federation_client.send_leave(target_hosts, event) - context = yield self.state_handler.compute_event_context(event) - yield self.persist_events_and_notify([(event, context)]) + context = await self.state_handler.compute_event_context(event) + await self.persist_events_and_notify([(event, context)]) return event - @defer.inlineCallbacks - def _make_and_verify_event( - self, target_hosts, room_id, user_id, membership, content={}, params=None - ): + async def _make_and_verify_event( + self, + target_hosts: Iterable[str], + room_id: str, + user_id: str, + membership: str, + content: JsonDict = {}, + params: Optional[Dict[str, str]] = None, + ) -> Tuple[str, EventBase, RoomVersion]: ( origin, event, room_version, - ) = yield self.federation_client.make_membership_event( + ) = await self.federation_client.make_membership_event( target_hosts, room_id, user_id, membership, content, params=params ) @@ -1633,20 +1628,17 @@ class FederationHandler(BaseHandler): assert event.room_id == room_id return origin, event, room_version - @defer.inlineCallbacks - @log_function - def on_make_leave_request(self, origin, room_id, user_id): + async def on_make_leave_request( + self, origin: str, room_id: str, user_id: str + ) -> EventBase: """ We've received a /make_leave/ request, so we create a partial leave event for the room and return that. We do *not* persist or process it until the other server has signed it and sent it back. Args: - origin (str): The (verified) server name of the requesting server. - room_id (str): Room to create leave event in - user_id (str): The user to create the leave for - - Returns: - Deferred[FrozenEvent] + origin: The (verified) server name of the requesting server. + room_id: Room to create leave event in + user_id: The user to create the leave for """ if get_domain_from_id(user_id) != origin: logger.info( @@ -1656,7 +1648,7 @@ class FederationHandler(BaseHandler): ) raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) builder = self.event_builder_factory.new( room_version, { @@ -1668,11 +1660,11 @@ class FederationHandler(BaseHandler): }, ) - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1684,7 +1676,7 @@ class FederationHandler(BaseHandler): try: # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_leave_request` - yield self.auth.check_from_context( + await self.auth.check_from_context( room_version, event, context, do_sig_check=False ) except AuthError as e: @@ -1693,9 +1685,7 @@ class FederationHandler(BaseHandler): return event - @defer.inlineCallbacks - @log_function - def on_send_leave_request(self, origin, pdu): + async def on_send_leave_request(self, origin, pdu): """ We have received a leave event for a room. Fully process it.""" event = pdu @@ -1715,9 +1705,9 @@ class FederationHandler(BaseHandler): event.internal_metadata.outlier = False - context = yield self._handle_new_event(origin, event) + context = await self._handle_new_event(origin, event) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -1798,6 +1788,9 @@ class FederationHandler(BaseHandler): if not in_room: raise AuthError(403, "Host not in room.") + # Synapse asks for 100 events per backfill request. Do not allow more. + limit = min(limit, 100) + events = yield self.store.get_backfill_events(room_id, pdu_list, limit) events = yield filter_events_for_server(self.storage, origin, events) @@ -1839,11 +1832,10 @@ class FederationHandler(BaseHandler): def get_min_depth_for_context(self, context): return self.store.get_min_depth(context) - @defer.inlineCallbacks - def _handle_new_event( + async def _handle_new_event( self, origin, event, state=None, auth_events=None, backfilled=False ): - context = yield self._prep_event( + context = await self._prep_event( origin, event, state=state, auth_events=auth_events, backfilled=backfilled ) @@ -1856,11 +1848,11 @@ class FederationHandler(BaseHandler): and not backfilled and not context.rejected ): - yield self.action_generator.handle_push_actions_for_event( + await self.action_generator.handle_push_actions_for_event( event, context ) - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [(event, context)], backfilled=backfilled ) success = True @@ -1872,13 +1864,12 @@ class FederationHandler(BaseHandler): return context - @defer.inlineCallbacks - def _handle_new_events( + async def _handle_new_events( self, origin: str, event_infos: Iterable[_NewEventInfo], backfilled: bool = False, - ): + ) -> None: """Creates the appropriate contexts and persists events. The events should not depend on one another, e.g. this should be used to persist a bunch of outliers, but not a chunk of individual events that depend @@ -1887,11 +1878,10 @@ class FederationHandler(BaseHandler): Notifies about the events where appropriate. """ - @defer.inlineCallbacks - def prep(ev_info: _NewEventInfo): + async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): - res = yield self._prep_event( + res = await self._prep_event( origin, event, state=ev_info.state, @@ -1900,14 +1890,14 @@ class FederationHandler(BaseHandler): ) return res - contexts = yield make_deferred_yieldable( + contexts = await make_deferred_yieldable( defer.gatherResults( [run_in_background(prep, ev_info) for ev_info in event_infos], consumeErrors=True, ) ) - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [ (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) @@ -1915,15 +1905,14 @@ class FederationHandler(BaseHandler): backfilled=backfilled, ) - @defer.inlineCallbacks - def _persist_auth_tree( + async def _persist_auth_tree( self, origin: str, auth_events: List[EventBase], state: List[EventBase], event: EventBase, room_version: RoomVersion, - ): + ) -> None: """Checks the auth chain is valid (and passes auth checks) for the state and event. Then persists the auth chain and state atomically. Persists the event separately. Notifies about the persisted events @@ -1938,14 +1927,11 @@ class FederationHandler(BaseHandler): event room_version: The room version we expect this room to have, and will raise if it doesn't match the version in the create event. - - Returns: - Deferred """ events_to_context = {} for e in itertools.chain(auth_events, state): e.internal_metadata.outlier = True - ctx = yield self.state_handler.compute_event_context(e) + ctx = await self.state_handler.compute_event_context(e) events_to_context[e.event_id] = ctx event_map = { @@ -1977,12 +1963,8 @@ class FederationHandler(BaseHandler): missing_auth_events.add(e_id) for e_id in missing_auth_events: - m_ev = yield self.federation_client.get_pdu( - [origin], - e_id, - room_version=room_version.identifier, - outlier=True, - timeout=10000, + m_ev = await self.federation_client.get_pdu( + [origin], e_id, room_version=room_version, outlier=True, timeout=10000, ) if m_ev and m_ev.event_id == e_id: event_map[e_id] = m_ev @@ -2013,91 +1995,74 @@ class FederationHandler(BaseHandler): raise events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR - yield self.persist_events_and_notify( + await self.persist_events_and_notify( [ (e, events_to_context[e.event_id]) for e in itertools.chain(auth_events, state) ] ) - new_event_context = yield self.state_handler.compute_event_context( + new_event_context = await self.state_handler.compute_event_context( event, old_state=state ) - yield self.persist_events_and_notify([(event, new_event_context)]) + await self.persist_events_and_notify([(event, new_event_context)]) - @defer.inlineCallbacks - def _prep_event( + async def _prep_event( self, origin: str, event: EventBase, state: Optional[Iterable[EventBase]], auth_events: Optional[StateMap[EventBase]], backfilled: bool, - ): - """ - - Args: - origin: - event: - state: - auth_events: - backfilled: - - Returns: - Deferred, which resolves to synapse.events.snapshot.EventContext - """ - context = yield self.state_handler.compute_event_context(event, old_state=state) + ) -> EventContext: + context = await self.state_handler.compute_event_context(event, old_state=state) if not auth_events: - prev_state_ids = yield context.get_prev_state_ids() - auth_events_ids = yield self.auth.compute_auth_events( + prev_state_ids = await context.get_prev_state_ids() + auth_events_ids = await self.auth.compute_auth_events( event, prev_state_ids, for_verification=True ) - auth_events = yield self.store.get_events(auth_events_ids) + auth_events = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events.values()} # This is a hack to fix some old rooms where the initial join event # didn't reference the create event in its auth events. if event.type == EventTypes.Member and not event.auth_event_ids(): if len(event.prev_event_ids()) == 1 and event.depth < 5: - c = yield self.store.get_event( + c = await self.store.get_event( event.prev_event_ids()[0], allow_none=True ) if c and c.type == EventTypes.Create: auth_events[(c.type, c.state_key)] = c - context = yield self.do_auth(origin, event, context, auth_events=auth_events) + context = await self.do_auth(origin, event, context, auth_events=auth_events) if not context.rejected: - yield self._check_for_soft_fail(event, state, backfilled) + await self._check_for_soft_fail(event, state, backfilled) if event.type == EventTypes.GuestAccess and not context.rejected: - yield self.maybe_kick_guest_users(event) + await self.maybe_kick_guest_users(event) return context - @defer.inlineCallbacks - def _check_for_soft_fail( + async def _check_for_soft_fail( self, event: EventBase, state: Optional[Iterable[EventBase]], backfilled: bool - ): - """Checks if we should soft fail the event, if so marks the event as + ) -> None: + """Checks if we should soft fail the event; if so, marks the event as such. Args: event state: The state at the event if we don't have all the event's prev events backfilled: Whether the event is from backfill - - Returns: - Deferred """ # For new (non-backfilled and non-outlier) events we check if the event # passes auth based on the current state. If it doesn't then we # "soft-fail" the event. do_soft_fail_check = not backfilled and not event.internal_metadata.is_outlier() if do_soft_fail_check: - extrem_ids = yield self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids = await self.store.get_latest_event_ids_in_room(event.room_id) extrem_ids = set(extrem_ids) prev_event_ids = set(event.prev_event_ids()) @@ -2108,7 +2073,7 @@ class FederationHandler(BaseHandler): do_soft_fail_check = False if do_soft_fail_check: - room_version = yield self.store.get_room_version_id(event.room_id) + room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] # Calculate the "current state". @@ -2125,19 +2090,19 @@ class FederationHandler(BaseHandler): # given state at the event. This should correctly handle cases # like bans, especially with state res v2. - state_sets = yield self.state_store.get_state_groups( + state_sets = await self.state_store.get_state_groups( event.room_id, extrem_ids ) state_sets = list(state_sets.values()) state_sets.append(state) - current_state_ids = yield self.state_handler.resolve_events( + current_state_ids = await self.state_handler.resolve_events( room_version, state_sets, event ) current_state_ids = { k: e.event_id for k, e in iteritems(current_state_ids) } else: - current_state_ids = yield self.state_handler.get_current_state_ids( + current_state_ids = await self.state_handler.get_current_state_ids( event.room_id, latest_event_ids=extrem_ids ) @@ -2153,7 +2118,7 @@ class FederationHandler(BaseHandler): e for k, e in iteritems(current_state_ids) if k in auth_types ] - current_auth_events = yield self.store.get_events(current_state_ids) + current_auth_events = await self.store.get_events(current_state_ids) current_auth_events = { (e.type, e.state_key): e for e in current_auth_events.values() } @@ -2166,15 +2131,14 @@ class FederationHandler(BaseHandler): logger.warning("Soft-failing %r because %s", event, e) event.internal_metadata.soft_failed = True - @defer.inlineCallbacks - def on_query_auth( + async def on_query_auth( self, origin, event_id, room_id, remote_auth_chain, rejects, missing ): - in_room = yield self.auth.check_host_in_room(room_id, origin) + in_room = await self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") - event = yield self.store.get_event( + event = await self.store.get_event( event_id, allow_none=False, check_room_id=room_id ) @@ -2182,57 +2146,61 @@ class FederationHandler(BaseHandler): # don't want to fall into the trap of `missing` being wrong. for e in remote_auth_chain: try: - yield self._handle_new_event(origin, e) + await self._handle_new_event(origin, e) except AuthError: pass # Now get the current auth_chain for the event. - local_auth_chain = yield self.store.get_auth_chain( + local_auth_chain = await self.store.get_auth_chain( [auth_id for auth_id in event.auth_event_ids()], include_given=True ) # TODO: Check if we would now reject event_id. If so we need to tell # everyone. - ret = yield self.construct_auth_difference(local_auth_chain, remote_auth_chain) + ret = await self.construct_auth_difference(local_auth_chain, remote_auth_chain) logger.debug("on_query_auth returning: %s", ret) return ret - @defer.inlineCallbacks - def on_get_missing_events( + async def on_get_missing_events( self, origin, room_id, earliest_events, latest_events, limit ): - in_room = yield self.auth.check_host_in_room(room_id, origin) + in_room = await self.auth.check_host_in_room(room_id, origin) if not in_room: raise AuthError(403, "Host not in room.") + # Only allow up to 20 events to be retrieved per request. limit = min(limit, 20) - missing_events = yield self.store.get_missing_events( + missing_events = await self.store.get_missing_events( room_id=room_id, earliest_events=earliest_events, latest_events=latest_events, limit=limit, ) - missing_events = yield filter_events_for_server( + missing_events = await filter_events_for_server( self.storage, origin, missing_events ) return missing_events - @defer.inlineCallbacks - @log_function - def do_auth(self, origin, event, context, auth_events): + async def do_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + auth_events: StateMap[EventBase], + ) -> EventContext: """ Args: - origin (str): - event (synapse.events.EventBase): - context (synapse.events.snapshot.EventContext): - auth_events (dict[(str, str)->synapse.events.EventBase]): + origin: + event: + context: + auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2242,13 +2210,13 @@ class FederationHandler(BaseHandler): Also NB that this function adds entries to it. Returns: - defer.Deferred[EventContext]: updated context object + updated context object """ - room_version = yield self.store.get_room_version_id(event.room_id) + room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] try: - context = yield self._update_auth_events_and_context_for_auth( + context = await self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) except Exception: @@ -2270,10 +2238,13 @@ class FederationHandler(BaseHandler): return context - @defer.inlineCallbacks - def _update_auth_events_and_context_for_auth( - self, origin, event, context, auth_events - ): + async def _update_auth_events_and_context_for_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + auth_events: StateMap[EventBase], + ) -> EventContext: """Helper for do_auth. See there for docs. Checks whether a given event has the expected auth events. If it @@ -2281,16 +2252,16 @@ class FederationHandler(BaseHandler): we can come to a consensus (e.g. if one server missed some valid state). - This attempts to resovle any potential divergence of state between + This attempts to resolve any potential divergence of state between servers, but is not essential and so failures should not block further processing of the event. Args: - origin (str): - event (synapse.events.EventBase): - context (synapse.events.snapshot.EventContext): + origin: + event: + context: - auth_events (dict[(str, str)->synapse.events.EventBase]): + auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2301,7 +2272,7 @@ class FederationHandler(BaseHandler): Also NB that this function adds entries to it. Returns: - defer.Deferred[EventContext]: updated context + updated context """ event_auth_events = set(event.auth_event_ids()) @@ -2315,7 +2286,7 @@ class FederationHandler(BaseHandler): # # we start by checking if they are in the store, and then try calling /event_auth/. if missing_auth: - have_events = yield self.store.have_seen_events(missing_auth) + have_events = await self.store.have_seen_events(missing_auth) logger.debug("Events %s are in the store", have_events) missing_auth.difference_update(have_events) @@ -2324,7 +2295,7 @@ class FederationHandler(BaseHandler): logger.info("auth_events contains unknown events: %s", missing_auth) try: try: - remote_auth_chain = yield self.federation_client.get_event_auth( + remote_auth_chain = await self.federation_client.get_event_auth( origin, event.room_id, event.event_id ) except RequestSendFailed as e: @@ -2333,7 +2304,7 @@ class FederationHandler(BaseHandler): logger.info("Failed to get event auth from remote: %s", e) return context - seen_remotes = yield self.store.have_seen_events( + seen_remotes = await self.store.have_seen_events( [e.event_id for e in remote_auth_chain] ) @@ -2356,7 +2327,7 @@ class FederationHandler(BaseHandler): logger.debug( "do_auth %s missing_auth: %s", event.event_id, e.event_id ) - yield self._handle_new_event(origin, e, auth_events=auth) + await self._handle_new_event(origin, e, auth_events=auth) if e.event_id in event_auth_events: auth_events[(e.type, e.state_key)] = e @@ -2390,7 +2361,7 @@ class FederationHandler(BaseHandler): # XXX: currently this checks for redactions but I'm not convinced that is # necessary? - different_events = yield self.store.get_events_as_list(different_auth) + different_events = await self.store.get_events_as_list(different_auth) for d in different_events: if d.room_id != event.room_id: @@ -2416,8 +2387,8 @@ class FederationHandler(BaseHandler): remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) remote_state = remote_auth_events.values() - room_version = yield self.store.get_room_version_id(event.room_id) - new_state = yield self.state_handler.resolve_events( + room_version = await self.store.get_room_version_id(event.room_id) + new_state = await self.state_handler.resolve_events( room_version, (local_state, remote_state), event ) @@ -2432,27 +2403,27 @@ class FederationHandler(BaseHandler): auth_events.update(new_state) - context = yield self._update_context_for_auth_events( + context = await self._update_context_for_auth_events( event, context, auth_events ) return context - @defer.inlineCallbacks - def _update_context_for_auth_events(self, event, context, auth_events): + async def _update_context_for_auth_events( + self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] + ) -> EventContext: """Update the state_ids in an event context after auth event resolution, storing the changes as a new state group. Args: - event (Event): The event we're handling the context for + event: The event we're handling the context for - context (synapse.events.snapshot.EventContext): initial event context + context: initial event context - auth_events (dict[(str, str)->EventBase]): Events to update in the event - context. + auth_events: Events to update in the event context. Returns: - Deferred[EventContext]: new event context + new event context """ # exclude the state key of the new event from the current_state in the context. if event.is_state(): @@ -2463,19 +2434,19 @@ class FederationHandler(BaseHandler): k: a.event_id for k, a in iteritems(auth_events) if k != event_key } - current_state_ids = yield context.get_current_state_ids() + current_state_ids = await context.get_current_state_ids() current_state_ids = dict(current_state_ids) current_state_ids.update(state_updates) - prev_state_ids = yield context.get_prev_state_ids() + prev_state_ids = await context.get_prev_state_ids() prev_state_ids = dict(prev_state_ids) prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)}) # create a new state group as a delta from the existing one. prev_group = context.state_group - state_group = yield self.state_store.store_state_group( + state_group = await self.state_store.store_state_group( event.event_id, event.room_id, prev_group=prev_group, @@ -2492,8 +2463,9 @@ class FederationHandler(BaseHandler): delta_ids=state_updates, ) - @defer.inlineCallbacks - def construct_auth_difference(self, local_auth, remote_auth): + async def construct_auth_difference( + self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase] + ) -> Dict: """ Given a local and remote auth chain, find the differences. This assumes that we have already processed all events in remote_auth @@ -2602,7 +2574,7 @@ class FederationHandler(BaseHandler): reason_map = {} for e in base_remote_rejected: - reason = yield self.store.get_rejection_reason(e.event_id) + reason = await self.store.get_rejection_reason(e.event_id) if reason is None: # TODO: e is not in the current state, so we should # construct some proof of that. @@ -2687,33 +2659,31 @@ class FederationHandler(BaseHandler): destinations, room_id, event_dict ) - @defer.inlineCallbacks - @log_function - def on_exchange_third_party_invite_request(self, room_id, event_dict): + async def on_exchange_third_party_invite_request( + self, room_id: str, event_dict: JsonDict + ) -> None: """Handle an exchange_third_party_invite request from a remote server The remote server will call this when it wants to turn a 3pid invite into a normal m.room.member invite. Args: - room_id (str): The ID of the room. + room_id: The ID of the room. event_dict (dict[str, Any]): Dictionary containing the event body. - Returns: - Deferred: resolves (to None) """ - room_version = yield self.store.get_room_version_id(room_id) + room_version = await self.store.get_room_version_id(room_id) # NB: event_dict has a particular specced format we might need to fudge # if we change event formats too much. builder = self.event_builder_factory.new(room_version, event_dict) - event, context = yield self.event_creation_handler.create_new_client_event( + event, context = await self.event_creation_handler.create_new_client_event( builder=builder ) - event_allowed = yield self.third_party_event_rules.check_event_allowed( + event_allowed = await self.third_party_event_rules.check_event_allowed( event, context ) if not event_allowed: @@ -2724,16 +2694,16 @@ class FederationHandler(BaseHandler): 403, "This event is not allowed in this context", Codes.FORBIDDEN ) - event, context = yield self.add_display_name_to_third_party_invite( + event, context = await self.add_display_name_to_third_party_invite( room_version, event_dict, event, context ) try: - yield self.auth.check_from_context(room_version, event, context) + await self.auth.check_from_context(room_version, event, context) except AuthError as e: logger.warning("Denying third party invite %r because %s", event, e) raise e - yield self._check_signature(event, context) + await self._check_signature(event, context) # We need to tell the transaction queue to send this out, even # though the sender isn't a local user. @@ -2741,7 +2711,7 @@ class FederationHandler(BaseHandler): # We retrieve the room member handler here as to not cause a cyclic dependency member_handler = self.hs.get_room_member_handler() - yield member_handler.send_membership_event(None, event, context) + await member_handler.send_membership_event(None, event, context) @defer.inlineCallbacks def add_display_name_to_third_party_invite( @@ -2889,27 +2859,27 @@ class FederationHandler(BaseHandler): if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") - @defer.inlineCallbacks - def persist_events_and_notify(self, event_and_contexts, backfilled=False): + async def persist_events_and_notify( + self, + event_and_contexts: Sequence[Tuple[EventBase, EventContext]], + backfilled: bool = False, + ) -> None: """Persists events and tells the notifier/pushers about them, if necessary. Args: - event_and_contexts(list[tuple[FrozenEvent, EventContext]]) - backfilled (bool): Whether these events are a result of + event_and_contexts: + backfilled: Whether these events are a result of backfilling or not - - Returns: - Deferred """ if self.config.worker_app: - yield self._send_events_to_master( + await self._send_events_to_master( store=self.store, event_and_contexts=event_and_contexts, backfilled=backfilled, ) else: - max_stream_id = yield self.storage.persistence.persist_events( + max_stream_id = await self.storage.persistence.persist_events( event_and_contexts, backfilled=backfilled ) @@ -2920,15 +2890,17 @@ class FederationHandler(BaseHandler): if not backfilled: # Never notify for backfilled events for event, _ in event_and_contexts: - yield self._notify_persisted_event(event, max_stream_id) + await self._notify_persisted_event(event, max_stream_id) - def _notify_persisted_event(self, event, max_stream_id): + async def _notify_persisted_event( + self, event: EventBase, max_stream_id: int + ) -> None: """Checks to see if notifier/pushers should be notified about the event or not. Args: - event (FrozenEvent) - max_stream_id (int): The max_stream_id returned by persist_events + event: + max_stream_id: The max_stream_id returned by persist_events """ extra_users = [] @@ -2952,29 +2924,29 @@ class FederationHandler(BaseHandler): event, event_stream_id, max_stream_id, extra_users=extra_users ) - return self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) + await self.pusher_pool.on_new_notifications(event_stream_id, max_stream_id) - def _clean_room_for_join(self, room_id): + async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the server to join the room. Args: - room_id (str) + room_id """ if self.config.worker_app: - return self._clean_room_for_join_client(room_id) + await self._clean_room_for_join_client(room_id) else: - return self.store.clean_room_for_join(room_id) + await self.store.clean_room_for_join(room_id) - def user_joined_room(self, user, room_id): + async def user_joined_room(self, user: UserID, room_id: str) -> None: """Called when a new user has joined the room """ if self.config.worker_app: - return self._notify_user_membership_change( + await self._notify_user_membership_change( room_id=room_id, user_id=user.to_string(), change="joined" ) else: - return defer.succeed(user_joined_room(self.distributor, user, room_id)) + user_joined_room(self.distributor, user, room_id) @defer.inlineCallbacks def get_room_complexity(self, remote_room_hosts, room_id): diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 9743c0e0d7..e573392aba 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -932,10 +932,9 @@ class EventCreationHandler(object): # way? If we have been invited by a remote server, we need # to get them to sign the event. - returned_invite = yield federation_handler.send_invite( - invitee.domain, event + returned_invite = yield defer.ensureDeferred( + federation_handler.send_invite(invitee.domain, event) ) - event.unsigned.pop("room_state", None) # TODO: Make sure the signatures actually are correct. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b609a65f47..49ec2f48bc 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -64,18 +64,21 @@ class RoomCreationHandler(BaseHandler): "history_visibility": "shared", "original_invitees_have_ops": False, "guest_can_join": True, + "power_level_content_override": {"invite": 0}, }, RoomCreationPreset.TRUSTED_PRIVATE_CHAT: { "join_rules": JoinRules.INVITE, "history_visibility": "shared", "original_invitees_have_ops": True, "guest_can_join": True, + "power_level_content_override": {"invite": 0}, }, RoomCreationPreset.PUBLIC_CHAT: { "join_rules": JoinRules.PUBLIC, "history_visibility": "shared", "original_invitees_have_ops": False, "guest_can_join": False, + "power_level_content_override": {}, }, } @@ -259,7 +262,7 @@ class RoomCreationHandler(BaseHandler): for v in ("invite", "events_default"): current = int(pl_content.get(v, 0)) if current < restricted_level: - logger.info( + logger.debug( "Setting level for %s in %s to %i (was %i)", v, old_room_id, @@ -269,7 +272,7 @@ class RoomCreationHandler(BaseHandler): pl_content[v] = restricted_level updated = True else: - logger.info("Not setting level for %s (already %i)", v, current) + logger.debug("Not setting level for %s (already %i)", v, current) if updated: try: @@ -296,7 +299,7 @@ class RoomCreationHandler(BaseHandler): EventTypes.Aliases, events_default ) - logger.info("Setting correct PLs in new room to %s", new_pl_content) + logger.debug("Setting correct PLs in new room to %s", new_pl_content) yield self.event_creation_handler.create_and_send_nonmember_event( requester, { @@ -475,9 +478,7 @@ class RoomCreationHandler(BaseHandler): for alias_str in aliases: alias = RoomAlias.from_string(alias_str) try: - yield directory_handler.delete_association( - requester, alias, send_event=False - ) + yield directory_handler.delete_association(requester, alias) removed_aliases.append(alias_str) except SynapseError as e: logger.warning("Unable to remove alias %s from old room: %s", alias, e) @@ -508,7 +509,6 @@ class RoomCreationHandler(BaseHandler): RoomAlias.from_string(alias), new_room_id, servers=(self.hs.hostname,), - send_event=False, check_membership=False, ) logger.info("Moved alias %s to new room", alias) @@ -579,9 +579,13 @@ class RoomCreationHandler(BaseHandler): # Check whether the third party rules allows/changes the room create # request. - yield self.third_party_event_rules.on_create_room( + event_allowed = yield self.third_party_event_rules.on_create_room( requester, config, is_requester_admin=is_requester_admin ) + if not event_allowed: + raise SynapseError( + 403, "You are not permitted to create rooms", Codes.FORBIDDEN + ) if not is_requester_admin and not self.spam_checker.user_may_create_room( user_id @@ -657,7 +661,6 @@ class RoomCreationHandler(BaseHandler): room_id=room_id, room_alias=room_alias, servers=[self.hs.hostname], - send_event=False, check_membership=False, ) @@ -782,7 +785,7 @@ class RoomCreationHandler(BaseHandler): @defer.inlineCallbacks def send(etype, content, **kwargs): event = create(etype, content, **kwargs) - logger.info("Sending %s in new room", etype) + logger.debug("Sending %s in new room", etype) yield self.event_creation_handler.create_and_send_nonmember_event( creator, event, ratelimit=False ) @@ -796,7 +799,7 @@ class RoomCreationHandler(BaseHandler): creation_content.update({"creator": creator_id}) yield send(etype=EventTypes.Create, content=creation_content) - logger.info("Sending %s in new room", EventTypes.Member) + logger.debug("Sending %s in new room", EventTypes.Member) yield self.room_member_handler.update_membership( creator, creator.user, @@ -825,19 +828,24 @@ class RoomCreationHandler(BaseHandler): # This will be reudundant on pre-MSC2260 rooms, since the # aliases event is special-cased. EventTypes.Aliases: 0, + EventTypes.Tombstone: 100, + EventTypes.ServerACL: 100, }, "events_default": 0, "state_default": 50, "ban": 50, "kick": 50, "redact": 50, - "invite": 0, + "invite": 50, } if config["original_invitees_have_ops"]: for invitee in invite_list: power_level_content["users"][invitee] = 100 + # Power levels overrides are defined per chat preset + power_level_content.update(config["power_level_content_override"]) + if power_level_content_override: power_level_content.update(power_level_content_override) diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index dde448c296..597ead8653 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -964,8 +964,10 @@ class RoomMemberMasterHandler(RoomMemberHandler): # join dance for now, since we're kinda implicitly checking # that we are allowed to join when we decide whether or not we # need to do the invite/join dance. - yield self.federation_handler.do_invite_join( - remote_room_hosts, room_id, user.to_string(), content + yield defer.ensureDeferred( + self.federation_handler.do_invite_join( + remote_room_hosts, room_id, user.to_string(), content + ) ) yield self._user_joined_room(user, room_id) @@ -1002,8 +1004,10 @@ class RoomMemberMasterHandler(RoomMemberHandler): """ fed_handler = self.federation_handler try: - ret = yield fed_handler.do_remotely_reject_invite( - remote_room_hosts, room_id, target.to_string(), content=content, + ret = yield defer.ensureDeferred( + fed_handler.do_remotely_reject_invite( + remote_room_hosts, room_id, target.to_string(), content=content, + ) ) return ret except Exception as e: diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 68e6edace5..d93a276693 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -300,7 +300,7 @@ class StatsHandler(StateDeltasHandler): room_state["guest_access"] = event_content.get("guest_access") for room_id, state in room_to_state_updates.items(): - logger.info("Updating room_stats_state for %s: %s", room_id, state) + logger.debug("Updating room_stats_state for %s: %s", room_id, state) yield self.store.update_room_state(room_id, state) return room_to_stats_deltas, user_to_stats_deltas diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 14e7e6ab5b..db647914f4 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -14,20 +14,30 @@ # See the License for the specific language governing permissions and # limitations under the License. -import collections import itertools import logging +from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple from six import iteritems, itervalues +import attr from prometheus_client import Counter from synapse.api.constants import EventTypes, Membership +from synapse.api.filtering import FilterCollection +from synapse.events import EventBase from synapse.logging.context import LoggingContext from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter -from synapse.types import RoomStreamToken +from synapse.types import ( + Collection, + JsonDict, + RoomStreamToken, + StateMap, + StreamToken, + UserID, +) from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache @@ -63,17 +73,22 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_AGE = 30 * 60 * 1000 LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 -SyncConfig = collections.namedtuple( - "SyncConfig", ["user", "filter_collection", "is_guest", "request_key", "device_id"] -) +@attr.s(slots=True, frozen=True) +class SyncConfig: + user = attr.ib(type=UserID) + filter_collection = attr.ib(type=FilterCollection) + is_guest = attr.ib(type=bool) + request_key = attr.ib(type=Tuple[Any, ...]) + device_id = attr.ib(type=str) -class TimelineBatch( - collections.namedtuple("TimelineBatch", ["prev_batch", "events", "limited"]) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class TimelineBatch: + prev_batch = attr.ib(type=StreamToken) + events = attr.ib(type=List[EventBase]) + limited = attr.ib(bool) - def __nonzero__(self): + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -82,23 +97,17 @@ class TimelineBatch( __bool__ = __nonzero__ # python3 -class JoinedSyncResult( - collections.namedtuple( - "JoinedSyncResult", - [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "ephemeral", - "account_data", - "unread_notifications", - "summary", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s(slots=True, frozen=True) +class JoinedSyncResult: + room_id = attr.ib(type=str) + timeline = attr.ib(type=TimelineBatch) + state = attr.ib(type=StateMap[EventBase]) + ephemeral = attr.ib(type=List[JsonDict]) + account_data = attr.ib(type=List[JsonDict]) + unread_notifications = attr.ib(type=JsonDict) + summary = attr.ib(type=Optional[JsonDict]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -114,20 +123,14 @@ class JoinedSyncResult( __bool__ = __nonzero__ # python3 -class ArchivedSyncResult( - collections.namedtuple( - "ArchivedSyncResult", - [ - "room_id", # str - "timeline", # TimelineBatch - "state", # dict[(str, str), FrozenEvent] - "account_data", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s(slots=True, frozen=True) +class ArchivedSyncResult: + room_id = attr.ib(type=str) + timeline = attr.ib(type=TimelineBatch) + state = attr.ib(type=StateMap[EventBase]) + account_data = attr.ib(type=List[JsonDict]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if room needs to be part of the sync result. """ @@ -136,70 +139,88 @@ class ArchivedSyncResult( __bool__ = __nonzero__ # python3 -class InvitedSyncResult( - collections.namedtuple( - "InvitedSyncResult", - ["room_id", "invite"], # str # FrozenEvent: the invite event - ) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class InvitedSyncResult: + room_id = attr.ib(type=str) + invite = attr.ib(type=EventBase) - def __nonzero__(self): + def __nonzero__(self) -> bool: """Invited rooms should always be reported to the client""" return True __bool__ = __nonzero__ # python3 -class GroupsSyncResult( - collections.namedtuple("GroupsSyncResult", ["join", "invite", "leave"]) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class GroupsSyncResult: + join = attr.ib(type=JsonDict) + invite = attr.ib(type=JsonDict) + leave = attr.ib(type=JsonDict) - def __nonzero__(self): + def __nonzero__(self) -> bool: return bool(self.join or self.invite or self.leave) __bool__ = __nonzero__ # python3 -class DeviceLists( - collections.namedtuple( - "DeviceLists", - [ - "changed", # list of user_ids whose devices may have changed - "left", # list of user_ids whose devices we no longer track - ], - ) -): - __slots__ = [] +@attr.s(slots=True, frozen=True) +class DeviceLists: + """ + Attributes: + changed: List of user_ids whose devices may have changed + left: List of user_ids whose devices we no longer track + """ + + changed = attr.ib(type=Collection[str]) + left = attr.ib(type=Collection[str]) - def __nonzero__(self): + def __nonzero__(self) -> bool: return bool(self.changed or self.left) __bool__ = __nonzero__ # python3 -class SyncResult( - collections.namedtuple( - "SyncResult", - [ - "next_batch", # Token for the next sync - "presence", # List of presence events for the user. - "account_data", # List of account_data events for the user. - "joined", # JoinedSyncResult for each joined room. - "invited", # InvitedSyncResult for each invited room. - "archived", # ArchivedSyncResult for each archived room. - "to_device", # List of direct messages for the device. - "device_lists", # List of user_ids whose devices have changed - "device_one_time_keys_count", # Dict of algorithm to count for one time keys - # for this device - "groups", - ], - ) -): - __slots__ = [] - - def __nonzero__(self): +@attr.s +class _RoomChanges: + """The set of room entries to include in the sync, plus the set of joined + and left room IDs since last sync. + """ + + room_entries = attr.ib(type=List["RoomSyncResultBuilder"]) + invited = attr.ib(type=List[InvitedSyncResult]) + newly_joined_rooms = attr.ib(type=List[str]) + newly_left_rooms = attr.ib(type=List[str]) + + +@attr.s(slots=True, frozen=True) +class SyncResult: + """ + Attributes: + next_batch: Token for the next sync + presence: List of presence events for the user. + account_data: List of account_data events for the user. + joined: JoinedSyncResult for each joined room. + invited: InvitedSyncResult for each invited room. + archived: ArchivedSyncResult for each archived room. + to_device: List of direct messages for the device. + device_lists: List of user_ids whose devices have changed + device_one_time_keys_count: Dict of algorithm to count for one time keys + for this device + groups: Group updates, if any + """ + + next_batch = attr.ib(type=StreamToken) + presence = attr.ib(type=List[JsonDict]) + account_data = attr.ib(type=List[JsonDict]) + joined = attr.ib(type=List[JoinedSyncResult]) + invited = attr.ib(type=List[InvitedSyncResult]) + archived = attr.ib(type=List[ArchivedSyncResult]) + to_device = attr.ib(type=List[JsonDict]) + device_lists = attr.ib(type=DeviceLists) + device_one_time_keys_count = attr.ib(type=JsonDict) + groups = attr.ib(type=Optional[GroupsSyncResult]) + + def __nonzero__(self) -> bool: """Make the result appear empty if there are no updates. This is used to tell if the notifier needs to wait for more events when polling for events. @@ -243,13 +264,15 @@ class SyncHandler(object): ) async def wait_for_sync_for_user( - self, sync_config, since_token=None, timeout=0, full_state=False - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + timeout: int = 0, + full_state: bool = False, + ) -> SyncResult: """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. - Returns: - Deferred[SyncResult] """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -268,8 +291,12 @@ class SyncHandler(object): return res async def _wait_for_sync_for_user( - self, sync_config, since_token, timeout, full_state - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + timeout: int = 0, + full_state: bool = False, + ) -> SyncResult: if since_token is None: sync_type = "initial_sync" elif full_state: @@ -308,25 +335,33 @@ class SyncHandler(object): return result - def current_sync_for_user(self, sync_config, since_token=None, full_state=False): + async def current_sync_for_user( + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> SyncResult: """Get the sync for client needed to match what the server has now. - Returns: - A Deferred SyncResult. """ - return self.generate_sync_result(sync_config, since_token, full_state) + return await self.generate_sync_result(sync_config, since_token, full_state) - async def push_rules_for_user(self, user): + async def push_rules_for_user(self, user: UserID) -> JsonDict: user_id = user.to_string() rules = await self.store.get_push_rules_for_user(user_id) rules = format_push_rules_for_user(user, rules) return rules - async def ephemeral_by_room(self, sync_result_builder, now_token, since_token=None): + async def ephemeral_by_room( + self, + sync_result_builder: "SyncResultBuilder", + now_token: StreamToken, + since_token: Optional[StreamToken] = None, + ) -> Tuple[StreamToken, Dict[str, List[JsonDict]]]: """Get the ephemeral events for each room the user is in Args: - sync_result_builder(SyncResultBuilder) - now_token (StreamToken): Where the server is currently up to. - since_token (StreamToken): Where the server was when the client + sync_result_builder + now_token: Where the server is currently up to. + since_token: Where the server was when the client last synced. Returns: A tuple of the now StreamToken, updated to reflect the which typing @@ -351,7 +386,7 @@ class SyncHandler(object): ) now_token = now_token.copy_and_replace("typing_key", typing_key) - ephemeral_by_room = {} + ephemeral_by_room = {} # type: JsonDict for event in typing: # we want to exclude the room_id from the event, but modifying the @@ -383,13 +418,13 @@ class SyncHandler(object): async def _load_filtered_recents( self, - room_id, - sync_config, - now_token, - since_token=None, - recents=None, - newly_joined_room=False, - ): + room_id: str, + sync_config: SyncConfig, + now_token: StreamToken, + since_token: Optional[StreamToken] = None, + potential_recents: Optional[List[EventBase]] = None, + newly_joined_room: bool = False, + ) -> TimelineBatch: """ Returns: a Deferred TimelineBatch @@ -400,21 +435,29 @@ class SyncHandler(object): sync_config.filter_collection.blocks_all_room_timeline() ) - if recents is None or newly_joined_room or timeline_limit < len(recents): + if ( + potential_recents is None + or newly_joined_room + or timeline_limit < len(potential_recents) + ): limited = True else: limited = False - if recents: - recents = sync_config.filter_collection.filter_room_timeline(recents) + if potential_recents: + recents = sync_config.filter_collection.filter_room_timeline( + potential_recents + ) # We check if there are any state events, if there are then we pass # all current state events to the filter_events function. This is to # ensure that we always include current state in the timeline - current_state_ids = frozenset() + current_state_ids = frozenset() # type: FrozenSet[str] if any(e.is_state() for e in recents): - current_state_ids = await self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(itervalues(current_state_ids)) + current_state_ids_map = await self.state.get_current_state_ids( + room_id + ) + current_state_ids = frozenset(itervalues(current_state_ids_map)) recents = await filter_events_for_client( self.storage, @@ -466,8 +509,10 @@ class SyncHandler(object): # ensure that we always include current state in the timeline current_state_ids = frozenset() if any(e.is_state() for e in loaded_recents): - current_state_ids = await self.state.get_current_state_ids(room_id) - current_state_ids = frozenset(itervalues(current_state_ids)) + current_state_ids_map = await self.state.get_current_state_ids( + room_id + ) + current_state_ids = frozenset(itervalues(current_state_ids_map)) loaded_recents = await filter_events_for_client( self.storage, @@ -496,17 +541,15 @@ class SyncHandler(object): limited=limited or newly_joined_room, ) - async def get_state_after_event(self, event, state_filter=StateFilter.all()): + async def get_state_after_event( + self, event: EventBase, state_filter: StateFilter = StateFilter.all() + ) -> StateMap[str]: """ Get the room state after the given event Args: - event(synapse.events.EventBase): event of interest - state_filter (StateFilter): The state filter used to fetch state - from the database. - - Returns: - A Deferred map from ((type, state_key)->Event) + event: event of interest + state_filter: The state filter used to fetch state from the database. """ state_ids = await self.state_store.get_state_ids_for_event( event.event_id, state_filter=state_filter @@ -517,18 +560,17 @@ class SyncHandler(object): return state_ids async def get_state_at( - self, room_id, stream_position, state_filter=StateFilter.all() - ): + self, + room_id: str, + stream_position: StreamToken, + state_filter: StateFilter = StateFilter.all(), + ) -> StateMap[str]: """ Get the room state at a particular stream position Args: - room_id(str): room for which to get state - stream_position(StreamToken): point at which to get state - state_filter (StateFilter): The state filter used to fetch state - from the database. - - Returns: - A Deferred map from ((type, state_key)->Event) + room_id: room for which to get state + stream_position: point at which to get state + state_filter: The state filter used to fetch state from the database. """ # FIXME this claims to get the state at a stream position, but # get_recent_events_for_room operates by topo ordering. This therefore @@ -549,23 +591,25 @@ class SyncHandler(object): state = {} return state - async def compute_summary(self, room_id, sync_config, batch, state, now_token): + async def compute_summary( + self, + room_id: str, + sync_config: SyncConfig, + batch: TimelineBatch, + state: StateMap[EventBase], + now_token: StreamToken, + ) -> Optional[JsonDict]: """ Works out a room summary block for this room, summarising the number of joined members in the room, and providing the 'hero' members if the room has no name so clients can consistently name rooms. Also adds state events to 'state' if needed to describe the heroes. - Args: - room_id(str): - sync_config(synapse.handlers.sync.SyncConfig): - batch(synapse.handlers.sync.TimelineBatch): The timeline batch for - the room that will be sent to the user. - state(dict): dict of (type, state_key) -> Event as returned by - compute_state_delta - now_token(str): Token of the end of the current batch. - - Returns: - A deferred dict describing the room summary + Args + room_id + sync_config + batch: The timeline batch for the room that will be sent to the user. + state: State as returned by compute_state_delta + now_token: Token of the end of the current batch. """ # FIXME: we could/should get this from room_stats when matthew/stats lands @@ -684,7 +728,7 @@ class SyncHandler(object): return summary - def get_lazy_loaded_members_cache(self, cache_key): + def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache: cache = self.lazy_loaded_members_cache.get(cache_key) if cache is None: logger.debug("creating LruCache for %r", cache_key) @@ -695,23 +739,24 @@ class SyncHandler(object): return cache async def compute_state_delta( - self, room_id, batch, sync_config, since_token, now_token, full_state - ): + self, + room_id: str, + batch: TimelineBatch, + sync_config: SyncConfig, + since_token: Optional[StreamToken], + now_token: StreamToken, + full_state: bool, + ) -> StateMap[EventBase]: """ Works out the difference in state between the start of the timeline and the previous sync. Args: - room_id(str): - batch(synapse.handlers.sync.TimelineBatch): The timeline batch for - the room that will be sent to the user. - sync_config(synapse.handlers.sync.SyncConfig): - since_token(str|None): Token of the end of the previous batch. May - be None. - now_token(str): Token of the end of the current batch. - full_state(bool): Whether to force returning the full state. - - Returns: - A deferred dict of (type, state_key) -> Event + room_id: + batch: The timeline batch for the room that will be sent to the user. + sync_config: + since_token: Token of the end of the previous batch. May be None. + now_token: Token of the end of the current batch. + full_state: Whether to force returning the full state. """ # TODO(mjark) Check if the state events were received by the server # after the previous sync, since we need to include those state @@ -803,6 +848,10 @@ class SyncHandler(object): # about them). state_filter = StateFilter.all() + # If this is an initial sync then full_state should be set, and + # that case is handled above. We assert here to ensure that this + # is indeed the case. + assert since_token is not None state_at_previous_sync = await self.get_state_at( room_id, stream_position=since_token, state_filter=state_filter ) @@ -877,7 +926,7 @@ class SyncHandler(object): if t[0] == EventTypes.Member: cache.set(t[1], event_id) - state = {} + state = {} # type: Dict[str, EventBase] if state_ids: state = await self.store.get_events(list(state_ids.values())) @@ -889,7 +938,9 @@ class SyncHandler(object): if e.type != EventTypes.Aliases # until MSC2261 or alternative solution } - async def unread_notifs_for_room_id(self, room_id, sync_config): + async def unread_notifs_for_room_id( + self, room_id: str, sync_config: SyncConfig + ) -> Optional[Dict[str, str]]: with Measure(self.clock, "unread_notifs_for_room_id"): last_unread_event_id = await self.store.get_last_receipt_event_id_for_user( user_id=sync_config.user.to_string(), @@ -897,7 +948,6 @@ class SyncHandler(object): receipt_type="m.read", ) - notifs = [] if last_unread_event_id: notifs = await self.store.get_unread_event_push_actions_by_room_for_user( room_id, sync_config.user.to_string(), last_unread_event_id @@ -909,17 +959,12 @@ class SyncHandler(object): return None async def generate_sync_result( - self, sync_config, since_token=None, full_state=False - ): + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> SyncResult: """Generates a sync result. - - Args: - sync_config (SyncConfig) - since_token (StreamToken) - full_state (bool) - - Returns: - Deferred(SyncResult) """ # NB: The now_token gets changed by some of the generate_sync_* methods, # this is due to some of the underlying streams not supporting the ability @@ -927,7 +972,7 @@ class SyncHandler(object): # Always use the `now_token` in `SyncResultBuilder` now_token = await self.event_sources.get_current_token() - logger.info( + logger.debug( "Calculating sync response for %r between %s and %s", sync_config.user, since_token, @@ -981,7 +1026,7 @@ class SyncHandler(object): ) device_id = sync_config.device_id - one_time_key_counts = {} + one_time_key_counts = {} # type: JsonDict if device_id: one_time_key_counts = await self.store.count_e2e_one_time_keys( user_id, device_id @@ -1011,7 +1056,9 @@ class SyncHandler(object): ) @measure_func("_generate_sync_entry_for_groups") - async def _generate_sync_entry_for_groups(self, sync_result_builder): + async def _generate_sync_entry_for_groups( + self, sync_result_builder: "SyncResultBuilder" + ) -> None: user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token now_token = sync_result_builder.now_token @@ -1056,27 +1103,22 @@ class SyncHandler(object): @measure_func("_generate_sync_entry_for_device_list") async def _generate_sync_entry_for_device_list( self, - sync_result_builder, - newly_joined_rooms, - newly_joined_or_invited_users, - newly_left_rooms, - newly_left_users, - ): + sync_result_builder: "SyncResultBuilder", + newly_joined_rooms: Set[str], + newly_joined_or_invited_users: Set[str], + newly_left_rooms: Set[str], + newly_left_users: Set[str], + ) -> DeviceLists: """Generate the DeviceLists section of sync Args: - sync_result_builder (SyncResultBuilder) - newly_joined_rooms (set[str]): Set of rooms user has joined since - previous sync - newly_joined_or_invited_users (set[str]): Set of users that have - joined or been invited to a room since previous sync. - newly_left_rooms (set[str]): Set of rooms user has left since + sync_result_builder + newly_joined_rooms: Set of rooms user has joined since previous sync + newly_joined_or_invited_users: Set of users that have joined or + been invited to a room since previous sync. + newly_left_rooms: Set of rooms user has left since previous sync + newly_left_users: Set of users that have left a room we're in since previous sync - newly_left_users (set[str]): Set of users that have left a room - we're in since previous sync - - Returns: - Deferred[DeviceLists] """ user_id = sync_result_builder.sync_config.user.to_string() @@ -1137,15 +1179,11 @@ class SyncHandler(object): else: return DeviceLists(changed=[], left=[]) - async def _generate_sync_entry_for_to_device(self, sync_result_builder): + async def _generate_sync_entry_for_to_device( + self, sync_result_builder: "SyncResultBuilder" + ) -> None: """Generates the portion of the sync response. Populates `sync_result_builder` with the result. - - Args: - sync_result_builder(SyncResultBuilder) - - Returns: - Deferred(dict): A dictionary containing the per room account data. """ user_id = sync_result_builder.sync_config.user.to_string() device_id = sync_result_builder.sync_config.device_id @@ -1183,15 +1221,17 @@ class SyncHandler(object): else: sync_result_builder.to_device = [] - async def _generate_sync_entry_for_account_data(self, sync_result_builder): + async def _generate_sync_entry_for_account_data( + self, sync_result_builder: "SyncResultBuilder" + ) -> Dict[str, Dict[str, JsonDict]]: """Generates the account data portion of the sync response. Populates `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) + sync_result_builder Returns: - Deferred(dict): A dictionary containing the per room account data. + A dictionary containing the per room account data. """ sync_config = sync_result_builder.sync_config user_id = sync_result_builder.sync_config.user.to_string() @@ -1235,18 +1275,21 @@ class SyncHandler(object): return account_data_by_room async def _generate_sync_entry_for_presence( - self, sync_result_builder, newly_joined_rooms, newly_joined_or_invited_users - ): + self, + sync_result_builder: "SyncResultBuilder", + newly_joined_rooms: Set[str], + newly_joined_or_invited_users: Set[str], + ) -> None: """Generates the presence portion of the sync response. Populates the `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) - newly_joined_rooms(list): List of rooms that the user has joined - since the last sync (or empty if an initial sync) - newly_joined_or_invited_users(list): List of users that have joined - or been invited to rooms since the last sync (or empty if an initial - sync) + sync_result_builder + newly_joined_rooms: Set of rooms that the user has joined since + the last sync (or empty if an initial sync) + newly_joined_or_invited_users: Set of users that have joined or + been invited to rooms since the last sync (or empty if an + initial sync) """ now_token = sync_result_builder.now_token sync_config = sync_result_builder.sync_config @@ -1290,17 +1333,19 @@ class SyncHandler(object): sync_result_builder.presence = presence async def _generate_sync_entry_for_rooms( - self, sync_result_builder, account_data_by_room - ): + self, + sync_result_builder: "SyncResultBuilder", + account_data_by_room: Dict[str, Dict[str, JsonDict]], + ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]: """Generates the rooms portion of the sync response. Populates the `sync_result_builder` with the result. Args: - sync_result_builder(SyncResultBuilder) - account_data_by_room(dict): Dictionary of per room account data + sync_result_builder + account_data_by_room: Dictionary of per room account data Returns: - Deferred(tuple): Returns a 4-tuple of + Returns a 4-tuple of `(newly_joined_rooms, newly_joined_or_invited_users, newly_left_rooms, newly_left_users)` """ @@ -1311,7 +1356,7 @@ class SyncHandler(object): ) if block_all_room_ephemeral: - ephemeral_by_room = {} + ephemeral_by_room = {} # type: Dict[str, List[JsonDict]] else: now_token, ephemeral_by_room = await self.ephemeral_by_room( sync_result_builder, @@ -1332,7 +1377,7 @@ class SyncHandler(object): ) if not tags_by_room: logger.debug("no-oping sync") - return [], [], [], [] + return set(), set(), set(), set() ignored_account_data = await self.store.get_global_account_data_by_type_for_user( "m.ignored_user_list", user_id=user_id @@ -1344,19 +1389,22 @@ class SyncHandler(object): ignored_users = frozenset() if since_token: - res = await self._get_rooms_changed(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms, newly_left_rooms = res - + room_changes = await self._get_rooms_changed( + sync_result_builder, ignored_users + ) tags_by_room = await self.store.get_updated_tags( user_id, since_token.account_data_key ) else: - res = await self._get_all_rooms(sync_result_builder, ignored_users) - room_entries, invited, newly_joined_rooms = res - newly_left_rooms = [] + room_changes = await self._get_all_rooms(sync_result_builder, ignored_users) tags_by_room = await self.store.get_tags_for_user(user_id) + room_entries = room_changes.room_entries + invited = room_changes.invited + newly_joined_rooms = room_changes.newly_joined_rooms + newly_left_rooms = room_changes.newly_left_rooms + def handle_room_entries(room_entry): return self._generate_room_entry( sync_result_builder, @@ -1396,13 +1444,15 @@ class SyncHandler(object): newly_left_users -= newly_joined_or_invited_users return ( - newly_joined_rooms, + set(newly_joined_rooms), newly_joined_or_invited_users, - newly_left_rooms, + set(newly_left_rooms), newly_left_users, ) - async def _have_rooms_changed(self, sync_result_builder): + async def _have_rooms_changed( + self, sync_result_builder: "SyncResultBuilder" + ) -> bool: """Returns whether there may be any new events that should be sent down the sync. Returns True if there are. """ @@ -1426,22 +1476,10 @@ class SyncHandler(object): return True return False - async def _get_rooms_changed(self, sync_result_builder, ignored_users): + async def _get_rooms_changed( + self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str] + ) -> _RoomChanges: """Gets the the changes that have happened since the last sync. - - Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. - - Returns: - Deferred(tuple): Returns a tuple of the form: - `(room_entries, invited_rooms, newly_joined_rooms, newly_left_rooms)` - - where: - room_entries is a list [RoomSyncResultBuilder] - invited_rooms is a list [InvitedSyncResult] - newly_joined_rooms is a list[str] of room ids - newly_left_rooms is a list[str] of room ids """ user_id = sync_result_builder.sync_config.user.to_string() since_token = sync_result_builder.since_token @@ -1455,7 +1493,7 @@ class SyncHandler(object): user_id, since_token.room_key, now_token.room_key ) - mem_change_events_by_room_id = {} + mem_change_events_by_room_id = {} # type: Dict[str, List[EventBase]] for event in rooms_changed: mem_change_events_by_room_id.setdefault(event.room_id, []).append(event) @@ -1464,7 +1502,7 @@ class SyncHandler(object): room_entries = [] invited = [] for room_id, events in iteritems(mem_change_events_by_room_id): - logger.info( + logger.debug( "Membership changes in %s: [%s]", room_id, ", ".join(("%s (%s)" % (e.event_id, e.membership) for e in events)), @@ -1574,7 +1612,7 @@ class SyncHandler(object): # This is all screaming out for a refactor, as the logic here is # subtle and the moving parts numerous. if leave_event.internal_metadata.is_out_of_band_membership(): - batch_events = [leave_event] + batch_events = [leave_event] # type: Optional[List[EventBase]] else: batch_events = None @@ -1640,18 +1678,17 @@ class SyncHandler(object): ) room_entries.append(entry) - return room_entries, invited, newly_joined_rooms, newly_left_rooms + return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms) - async def _get_all_rooms(self, sync_result_builder, ignored_users): + async def _get_all_rooms( + self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str] + ) -> _RoomChanges: """Returns entries for all rooms for the user. Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. + sync_result_builder + ignored_users: Set of users ignored by user. - Returns: - Deferred(tuple): Returns a tuple of the form: - `([RoomSyncResultBuilder], [InvitedSyncResult], [])` """ user_id = sync_result_builder.sync_config.user.to_string() @@ -1713,30 +1750,30 @@ class SyncHandler(object): ) ) - return room_entries, invited, [] + return _RoomChanges(room_entries, invited, [], []) async def _generate_room_entry( self, - sync_result_builder, - ignored_users, - room_builder, - ephemeral, - tags, - account_data, - always_include=False, + sync_result_builder: "SyncResultBuilder", + ignored_users: Set[str], + room_builder: "RoomSyncResultBuilder", + ephemeral: List[JsonDict], + tags: Optional[List[JsonDict]], + account_data: Dict[str, JsonDict], + always_include: bool = False, ): """Populates the `joined` and `archived` section of `sync_result_builder` based on the `room_builder`. Args: - sync_result_builder(SyncResultBuilder) - ignored_users(set(str)): Set of users ignored by user. - room_builder(RoomSyncResultBuilder) - ephemeral(list): List of new ephemeral events for room - tags(list): List of *all* tags for room, or None if there has been + sync_result_builder + ignored_users: Set of users ignored by user. + room_builder + ephemeral: List of new ephemeral events for room + tags: List of *all* tags for room, or None if there has been no change. - account_data(list): List of new account data for room - always_include(bool): Always include this room in the sync response, + account_data: List of new account data for room + always_include: Always include this room in the sync response, even if empty. """ newly_joined = room_builder.newly_joined @@ -1762,7 +1799,7 @@ class SyncHandler(object): sync_config, now_token=upto_token, since_token=since_token, - recents=events, + potential_recents=events, newly_joined_room=newly_joined, ) @@ -1813,7 +1850,7 @@ class SyncHandler(object): room_id, batch, sync_config, since_token, now_token, full_state=full_state ) - summary = {} + summary = {} # type: Optional[JsonDict] # we include a summary in room responses when we're lazy loading # members (as the client otherwise doesn't have enough info to form @@ -1837,7 +1874,7 @@ class SyncHandler(object): ) if room_builder.rtype == "joined": - unread_notifications = {} + unread_notifications = {} # type: Dict[str, str] room_sync = JoinedSyncResult( room_id=room_id, timeline=batch, @@ -1859,23 +1896,25 @@ class SyncHandler(object): if batch.limited and since_token: user_id = sync_result_builder.sync_config.user.to_string() - logger.info( + logger.debug( "Incremental gappy sync of %s for user %s with %d state events" % (room_id, user_id, len(state)) ) elif room_builder.rtype == "archived": - room_sync = ArchivedSyncResult( + archived_room_sync = ArchivedSyncResult( room_id=room_id, timeline=batch, state=state, account_data=account_data_events, ) - if room_sync or always_include: - sync_result_builder.archived.append(room_sync) + if archived_room_sync or always_include: + sync_result_builder.archived.append(archived_room_sync) else: raise Exception("Unrecognized rtype: %r", room_builder.rtype) - async def get_rooms_for_user_at(self, user_id, stream_ordering): + async def get_rooms_for_user_at( + self, user_id: str, stream_ordering: int + ) -> FrozenSet[str]: """Get set of joined rooms for a user at the given stream ordering. The stream ordering *must* be recent, otherwise this may throw an @@ -1883,12 +1922,11 @@ class SyncHandler(object): current token, which should be perfectly fine). Args: - user_id (str) - stream_ordering (int) + user_id + stream_ordering ReturnValue: - Deferred[frozenset[str]]: Set of room_ids the user is in at given - stream_ordering. + Set of room_ids the user is in at given stream_ordering. """ joined_rooms = await self.store.get_rooms_for_user_with_stream_ordering(user_id) @@ -1915,11 +1953,10 @@ class SyncHandler(object): if user_id in users_in_room: joined_room_ids.add(room_id) - joined_room_ids = frozenset(joined_room_ids) - return joined_room_ids + return frozenset(joined_room_ids) -def _action_has_highlight(actions): +def _action_has_highlight(actions: List[JsonDict]) -> bool: for action in actions: try: if action.get("set_tweak", None) == "highlight": @@ -1931,22 +1968,23 @@ def _action_has_highlight(actions): def _calculate_state( - timeline_contains, timeline_start, previous, current, lazy_load_members -): + timeline_contains: StateMap[str], + timeline_start: StateMap[str], + previous: StateMap[str], + current: StateMap[str], + lazy_load_members: bool, +) -> StateMap[str]: """Works out what state to include in a sync response. Args: - timeline_contains (dict): state in the timeline - timeline_start (dict): state at the start of the timeline - previous (dict): state at the end of the previous sync (or empty dict + timeline_contains: state in the timeline + timeline_start: state at the start of the timeline + previous: state at the end of the previous sync (or empty dict if this is an initial sync) - current (dict): state at the end of the timeline - lazy_load_members (bool): whether to return members from timeline_start + current: state at the end of the timeline + lazy_load_members: whether to return members from timeline_start or not. assumes that timeline_start has already been filtered to include only the members the client needs to know about. - - Returns: - dict """ event_id_to_key = { e: key @@ -1983,15 +2021,16 @@ def _calculate_state( return {event_id_to_key[e]: e for e in state_ids} -class SyncResultBuilder(object): +@attr.s +class SyncResultBuilder: """Used to help build up a new SyncResult for a user Attributes: - sync_config (SyncConfig) - full_state (bool) - since_token (StreamToken) - now_token (StreamToken) - joined_room_ids (list[str]) + sync_config + full_state: The full_state flag as specified by user + since_token: The token supplied by user, or None. + now_token: The token to sync up to. + joined_room_ids: List of rooms the user is joined to # The following mirror the fields in a sync response presence (list) @@ -1999,61 +2038,45 @@ class SyncResultBuilder(object): joined (list[JoinedSyncResult]) invited (list[InvitedSyncResult]) archived (list[ArchivedSyncResult]) - device (list) groups (GroupsSyncResult|None) to_device (list) """ - def __init__( - self, sync_config, full_state, since_token, now_token, joined_room_ids - ): - """ - Args: - sync_config (SyncConfig) - full_state (bool): The full_state flag as specified by user - since_token (StreamToken): The token supplied by user, or None. - now_token (StreamToken): The token to sync up to. - joined_room_ids (list[str]): List of rooms the user is joined to - """ - self.sync_config = sync_config - self.full_state = full_state - self.since_token = since_token - self.now_token = now_token - self.joined_room_ids = joined_room_ids - - self.presence = [] - self.account_data = [] - self.joined = [] - self.invited = [] - self.archived = [] - self.device = [] - self.groups = None - self.to_device = [] + sync_config = attr.ib(type=SyncConfig) + full_state = attr.ib(type=bool) + since_token = attr.ib(type=Optional[StreamToken]) + now_token = attr.ib(type=StreamToken) + joined_room_ids = attr.ib(type=FrozenSet[str]) + + presence = attr.ib(type=List[JsonDict], default=attr.Factory(list)) + account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list)) + joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list)) + invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list)) + archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list)) + groups = attr.ib(type=Optional[GroupsSyncResult], default=None) + to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list)) +@attr.s class RoomSyncResultBuilder(object): """Stores information needed to create either a `JoinedSyncResult` or `ArchivedSyncResult`. + + Attributes: + room_id + rtype: One of `"joined"` or `"archived"` + events: List of events to include in the room (more events may be added + when generating result). + newly_joined: If the user has newly joined the room + full_state: Whether the full state should be sent in result + since_token: Earliest point to return events from, or None + upto_token: Latest point to return events from. """ - def __init__( - self, room_id, rtype, events, newly_joined, full_state, since_token, upto_token - ): - """ - Args: - room_id(str) - rtype(str): One of `"joined"` or `"archived"` - events(list[FrozenEvent]): List of events to include in the room - (more events may be added when generating result). - newly_joined(bool): If the user has newly joined the room - full_state(bool): Whether the full state should be sent in result - since_token(StreamToken): Earliest point to return events from, or None - upto_token(StreamToken): Latest point to return events from. - """ - self.room_id = room_id - self.rtype = rtype - self.events = events - self.newly_joined = newly_joined - self.full_state = full_state - self.since_token = since_token - self.upto_token = upto_token + room_id = attr.ib(type=str) + rtype = attr.ib(type=str) + events = attr.ib(type=Optional[List[EventBase]]) + newly_joined = attr.ib(type=bool) + full_state = attr.ib(type=bool) + since_token = attr.ib(type=Optional[StreamToken]) + upto_token = attr.ib(type=StreamToken) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 624f05ab5b..722760c59d 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -52,6 +52,7 @@ class UserDirectoryHandler(StateDeltasHandler): self.is_mine_id = hs.is_mine_id self.update_user_directory = hs.config.update_user_directory self.search_all_users = hs.config.user_directory_search_all_users + self.spam_checker = hs.get_spam_checker() # The current position in the current_state_delta stream self.pos = None @@ -65,7 +66,7 @@ class UserDirectoryHandler(StateDeltasHandler): # we start populating the user directory self.clock.call_later(0, self.notify_new_event) - def search_users(self, user_id, search_term, limit): + async def search_users(self, user_id, search_term, limit): """Searches for users in directory Returns: @@ -82,7 +83,16 @@ class UserDirectoryHandler(StateDeltasHandler): ] } """ - return self.store.search_user_dir(user_id, search_term, limit) + results = await self.store.search_user_dir(user_id, search_term, limit) + + # Remove any spammy users from the results. + results["results"] = [ + user + for user in results["results"] + if not self.spam_checker.check_username_for_spam(user) + ] + + return results def notify_new_event(self): """Called when there may be more deltas to process @@ -149,7 +159,7 @@ class UserDirectoryHandler(StateDeltasHandler): self.pos, room_max_stream_ordering ) - logger.info("Handling %d state deltas", len(deltas)) + logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos = max_pos @@ -195,7 +205,7 @@ class UserDirectoryHandler(StateDeltasHandler): room_id, self.server_name ) if not is_in_room: - logger.info("Server left room: %r", room_id) + logger.debug("Server left room: %r", room_id) # Fetch all the users that we marked as being in user # directory due to being in the room and then check if # need to remove those users or not diff --git a/synapse/http/site.py b/synapse/http/site.py index 911251c0bc..e092193c9c 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -225,7 +225,7 @@ class SynapseRequest(Request): self.start_time, name=servlet_name, method=self.get_method() ) - self.site.access_logger.info( + self.site.access_logger.debug( "%s - %s - Received request: %s %s", self.getClientIP(), self.site.site_tag, diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index afa9ef31bf..73a63ec78a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -402,7 +402,7 @@ class HttpPusher(object): Args: badge (int): number of unread messages """ - logger.info("Sending updated badge count %d to %s", badge, self.name) + logger.debug("Sending updated badge count %d to %s", badge, self.name) d = { "notification": { "id": "", diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 3455741195..2107b5dc56 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -21,7 +21,7 @@ from six import text_type from six.moves import http_client from synapse.api.constants import UserTypes -from synapse.api.errors import Codes, SynapseError +from synapse.api.errors import Codes, NotFoundError, SynapseError from synapse.http.servlet import ( RestServlet, assert_params_in_dict, @@ -105,7 +105,7 @@ class UsersRestServletV2(RestServlet): class UserRestServletV2(RestServlet): - PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>@[^/]+)$"),) + PATTERNS = (re.compile("^/_synapse/admin/v2/users/(?P<user_id>[^/]+)$"),) """Get request to list user details. This needs user to have administrator access in Synapse. @@ -136,6 +136,8 @@ class UserRestServletV2(RestServlet): self.hs = hs self.auth = hs.get_auth() self.admin_handler = hs.get_handlers().admin_handler + self.store = hs.get_datastore() + self.auth_handler = hs.get_auth_handler() self.profile_handler = hs.get_profile_handler() self.set_password_handler = hs.get_set_password_handler() self.deactivate_account_handler = hs.get_deactivate_account_handler() @@ -150,6 +152,9 @@ class UserRestServletV2(RestServlet): ret = await self.admin_handler.get_user(target_user) + if not ret: + raise NotFoundError("User not found") + return 200, ret async def on_PUT(self, request, user_id): @@ -163,6 +168,7 @@ class UserRestServletV2(RestServlet): raise SynapseError(400, "This endpoint can only be used with local users") user = await self.admin_handler.get_user(target_user) + user_id = target_user.to_string() if user: # modify user if "displayname" in body: @@ -170,6 +176,29 @@ class UserRestServletV2(RestServlet): target_user, requester, body["displayname"], True ) + if "threepids" in body: + # check for required parameters for each threepid + for threepid in body["threepids"]: + assert_params_in_dict(threepid, ["medium", "address"]) + + # remove old threepids from user + threepids = await self.store.user_get_threepids(user_id) + for threepid in threepids: + try: + await self.auth_handler.delete_threepid( + user_id, threepid["medium"], threepid["address"], None + ) + except Exception: + logger.exception("Failed to remove threepids") + raise SynapseError(500, "Failed to remove threepids") + + # add new threepids to user + current_time = self.hs.get_clock().time_msec() + for threepid in body["threepids"]: + await self.auth_handler.add_threepid( + user_id, threepid["medium"], threepid["address"], current_time + ) + if "avatar_url" in body: await self.profile_handler.set_avatar_url( target_user, requester, body["avatar_url"], True @@ -221,6 +250,7 @@ class UserRestServletV2(RestServlet): admin = body.get("admin", None) user_type = body.get("user_type", None) displayname = body.get("displayname", None) + threepids = body.get("threepids", None) if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES: raise SynapseError(400, "Invalid user type") @@ -232,6 +262,18 @@ class UserRestServletV2(RestServlet): default_display_name=displayname, user_type=user_type, ) + + if "threepids" in body: + # check for required parameters for each threepid + for threepid in body["threepids"]: + assert_params_in_dict(threepid, ["medium", "address"]) + + current_time = self.hs.get_clock().time_msec() + for threepid in body["threepids"]: + await self.auth_handler.add_threepid( + user_id, threepid["medium"], threepid["address"], current_time + ) + if "avatar_url" in body: await self.profile_handler.set_avatar_url( user_id, requester, body["avatar_url"], True @@ -568,7 +610,7 @@ class UserAdminServlet(RestServlet): {} """ - PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>@[^/]*)/admin$"),) + PATTERNS = (re.compile("^/_synapse/admin/v1/users/(?P<user_id>[^/]*)/admin$"),) def __init__(self, hs): self.hs = hs diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index 3d0fefb4df..3eeb3607f4 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -52,7 +52,6 @@ class VersionsRestServlet(RestServlet): ], # as per MSC1497: "unstable_features": { - "m.lazy_load_members": True, # as per MSC2190, as amended by MSC2264 # to be removed in r0.6.0 "m.id_access_token": True, diff --git a/synapse/server.pyi b/synapse/server.pyi index 90347ac23e..40eabfe5d9 100644 --- a/synapse/server.pyi +++ b/synapse/server.pyi @@ -107,3 +107,5 @@ class HomeServer(object): self, ) -> synapse.replication.tcp.client.ReplicationClientHandler: pass + def is_mine_id(self, domain_id: str) -> bool: + pass diff --git a/synapse/spam_checker_api/__init__.py b/synapse/spam_checker_api/__init__.py index efcc10f808..9b78924d96 100644 --- a/synapse/spam_checker_api/__init__.py +++ b/synapse/spam_checker_api/__init__.py @@ -18,6 +18,10 @@ from twisted.internet import defer from synapse.storage.state import StateFilter +MYPY = False +if MYPY: + import synapse.server + logger = logging.getLogger(__name__) @@ -26,18 +30,18 @@ class SpamCheckerApi(object): access to rooms and other relevant information. """ - def __init__(self, hs): + def __init__(self, hs: "synapse.server.HomeServer"): self.hs = hs self._store = hs.get_datastore() @defer.inlineCallbacks - def get_state_events_in_room(self, room_id, types): + def get_state_events_in_room(self, room_id: str, types: tuple) -> defer.Deferred: """Gets state events for the given room. Args: - room_id (string): The room ID to get state events in. - types (tuple): The event type and state key (using None + room_id: The room ID to get state events in. + types: The event type and state key (using None to represent 'any') of the room state to acquire. Returns: diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py index 60c67457b4..1746f40adf 100644 --- a/synapse/storage/data_stores/main/event_federation.py +++ b/synapse/storage/data_stores/main/event_federation.py @@ -26,6 +26,7 @@ from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause from synapse.storage.data_stores.main.events_worker import EventsWorkerStore from synapse.storage.data_stores.main.signatures import SignatureWorkerStore from synapse.storage.database import Database +from synapse.storage.engines import PostgresEngine from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -61,6 +62,28 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas ) def _get_auth_chain_ids_txn(self, txn, event_ids, include_given): + if isinstance(self.database_engine, PostgresEngine): + # For efficiency we make the database do this if we can. + sql = """ + WITH RECURSIVE auth_chain(event_id) AS ( + SELECT auth_id FROM event_auth WHERE event_id = ANY(?) + UNION + SELECT auth_id FROM event_auth + INNER JOIN auth_chain USING (event_id) + ) + SELECT event_id FROM auth_chain + """ + txn.execute(sql, (list(event_ids),)) + + results = set(event_id for event_id, in txn) + + if include_given: + results.update(event_ids) + + return list(results) + + # Database doesn't necessarily support recursive CTE, so we fall + # back to do doing it manually. if include_given: results = set(event_ids) else: diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql index a133d87a19..aec06c8261 100644 --- a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql +++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql @@ -15,5 +15,8 @@ -- Add background update to go and delete current state events for rooms the -- server is no longer in. -INSERT into background_updates (update_name, progress_json) - VALUES ('delete_old_current_state_events', '{}'); +-- +-- this relies on the 'membership' column of current_state_events, so make sure +-- that's populated first! +INSERT into background_updates (update_name, progress_json, depends_on) + VALUES ('delete_old_current_state_events', '{}', 'current_state_events_membership'); diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres new file mode 100644 index 0000000000..c601cff6de --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.postgres @@ -0,0 +1,35 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- when we first added the room_version column, it was populated via a background +-- update. We now need it to be populated before synapse starts, so we populate +-- any remaining rows with a NULL room version now. For servers which have completed +-- the background update, this will be pretty quick. + +-- the following query will set room_version to NULL if no create event is found for +-- the room in current_state_events, and will set it to '1' if a create event with no +-- room_version is found. + +UPDATE rooms SET room_version=( + SELECT COALESCE(json::json->'content'->>'room_version','1') + FROM current_state_events cse INNER JOIN event_json ej USING (event_id) + WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key='' +) WHERE rooms.room_version IS NULL; + +-- we still allow the background update to complete: it has the useful side-effect of +-- populating `rooms` with any missing rooms (based on the current_state_events table). + +-- see also rooms_version_column_2.sql.sqlite which has a copy of the above query, using +-- sqlite syntax for the json extraction. diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite new file mode 100644 index 0000000000..335c6f2074 --- /dev/null +++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column_2.sql.sqlite @@ -0,0 +1,22 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- see rooms_version_column_2.sql.postgres for details of what's going on here. + +UPDATE rooms SET room_version=( + SELECT COALESCE(json_extract(ej.json, '$.content.room_version'), '1') + FROM current_state_events cse INNER JOIN event_json ej USING (event_id) + WHERE cse.room_id=rooms.room_id AND cse.type='m.room.create' AND cse.state_key='' +) WHERE rooms.room_version IS NULL; diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py index 7af1495e47..380c1ec7da 100644 --- a/synapse/storage/data_stores/main/stats.py +++ b/synapse/storage/data_stores/main/stats.py @@ -271,31 +271,6 @@ class StatsStore(StateDeltasStore): return slice_list - def get_room_stats_state(self, room_id): - """ - Returns the current room_stats_state for a room. - - Args: - room_id (str): The ID of the room to return state for. - - Returns (dict): - Dictionary containing these keys: - "name", "topic", "canonical_alias", "avatar", "join_rules", - "history_visibility" - """ - return self.db.simple_select_one( - "room_stats_state", - {"room_id": room_id}, - retcols=( - "name", - "topic", - "canonical_alias", - "avatar", - "join_rules", - "history_visibility", - ), - ) - @cached() def get_earliest_token_for_stats(self, stats_type, id): """ diff --git a/synapse/storage/data_stores/main/user_directory.py b/synapse/storage/data_stores/main/user_directory.py index 90c180ec6d..6b8130bf0f 100644 --- a/synapse/storage/data_stores/main/user_directory.py +++ b/synapse/storage/data_stores/main/user_directory.py @@ -183,7 +183,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) return 1 - logger.info( + logger.debug( "Processing the next %d rooms of %d remaining" % (len(rooms_to_work_on), progress["remaining"]) ) @@ -308,7 +308,7 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): ) return 1 - logger.info( + logger.debug( "Processing the next %d users of %d remaining" % (len(users_to_work_on), progress["remaining"]) ) diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 1003dd84a5..3eeb2f7c04 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -343,7 +343,7 @@ class Database(object): top_three_counters = self._txn_perf_counters.interval(duration, limit=3) - perf_logger.info( + perf_logger.debug( "Total database time: %.3f%% {%s}", ratio * 100, top_three_counters ) diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index af3fd67ab9..a5370ed527 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -390,7 +390,7 @@ class EventsPersistenceStorage(object): state_delta_reuse_delta_counter.inc() break - logger.info("Calculating state delta for room %s", room_id) + logger.debug("Calculating state delta for room %s", room_id) with Measure( self._clock, "persist_events.get_new_state_after_events" ): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 04b6abdc24..581dffd8a0 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -73,6 +73,10 @@ class ObservableDeferred(object): def errback(f): object.__setattr__(self, "_result", (False, f)) while self._observers: + # This is a little bit of magic to correctly propagate stack + # traces when we `await` on one of the observer deferreds. + f.value.__failure__ = f + try: # TODO: Handle errors here. self._observers.pop().errback(f) diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 82d3eefe0e..b68f9fe0d4 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -144,7 +144,7 @@ class ResponseCache(object): """ result = self.get(key) if not result: - logger.info( + logger.debug( "[%s]: no cached result for [%s], calculating new one", self._name, key ) d = run_in_background(callback, *args, **kwargs) diff --git a/tests/api/test_filtering.py b/tests/api/test_filtering.py index 63d8633582..4e67503cf0 100644 --- a/tests/api/test_filtering.py +++ b/tests/api/test_filtering.py @@ -25,7 +25,7 @@ from twisted.internet import defer from synapse.api.constants import EventContentFields from synapse.api.errors import SynapseError from synapse.api.filtering import Filter -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from tests import unittest from tests.utils import DeferredMockCallable, MockHttpResource, setup_test_homeserver @@ -38,7 +38,7 @@ def MockEvent(**kwargs): kwargs["event_id"] = "fake_event_id" if "type" not in kwargs: kwargs["type"] = "fake_type" - return FrozenEvent(kwargs) + return make_event_from_dict(kwargs) class FilteringTestCase(unittest.TestCase): diff --git a/tests/crypto/test_event_signing.py b/tests/crypto/test_event_signing.py index 6143a50ab2..62f639a18d 100644 --- a/tests/crypto/test_event_signing.py +++ b/tests/crypto/test_event_signing.py @@ -19,7 +19,7 @@ from unpaddedbase64 import decode_base64 from synapse.api.room_versions import RoomVersions from synapse.crypto.event_signing import add_hashes_and_signatures -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from tests import unittest @@ -54,7 +54,7 @@ class EventSigningTestCase(unittest.TestCase): RoomVersions.V1, event_dict, HOSTNAME, self.signing_key ) - event = FrozenEvent(event_dict) + event = make_event_from_dict(event_dict) self.assertTrue(hasattr(event, "hashes")) self.assertIn("sha256", event.hashes) @@ -88,7 +88,7 @@ class EventSigningTestCase(unittest.TestCase): RoomVersions.V1, event_dict, HOSTNAME, self.signing_key ) - event = FrozenEvent(event_dict) + event = make_event_from_dict(event_dict) self.assertTrue(hasattr(event, "hashes")) self.assertIn("sha256", event.hashes) diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index 2b13980dfd..45d55b9e94 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -13,8 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.events.utils import ( copy_power_levels_contents, prune_event, @@ -30,7 +29,7 @@ def MockEvent(**kwargs): kwargs["event_id"] = "fake_event_id" if "type" not in kwargs: kwargs["type"] = "fake_type" - return FrozenEvent(kwargs) + return make_event_from_dict(kwargs) class PruneEventTestCase(unittest.TestCase): @@ -38,7 +37,9 @@ class PruneEventTestCase(unittest.TestCase): `matchdict` when it is redacted. """ def run_test(self, evdict, matchdict): - self.assertEquals(prune_event(FrozenEvent(evdict)).get_dict(), matchdict) + self.assertEquals( + prune_event(make_event_from_dict(evdict)).get_dict(), matchdict + ) def test_minimal(self): self.run_test( diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py index 1ec8c40901..e7d8699040 100644 --- a/tests/federation/test_federation_server.py +++ b/tests/federation/test_federation_server.py @@ -15,7 +15,7 @@ # limitations under the License. import logging -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.federation.federation_server import server_matches_acl_event from synapse.rest import admin from synapse.rest.client.v1 import login, room @@ -105,7 +105,7 @@ class StateQueryTests(unittest.FederatingHomeserverTestCase): def _create_acl_event(content): - return FrozenEvent( + return make_event_from_dict( { "room_id": "!a:b", "event_id": "$a:b", diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index a3aa0a1cf2..62b47f6574 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -160,6 +160,24 @@ class DeviceTestCase(unittest.HomeserverTestCase): res = self.get_success(self.handler.get_device(user1, "abc")) self.assertEqual(res["display_name"], "new display") + def test_update_device_too_long_display_name(self): + """Update a device with a display name that is invalid (too long).""" + self._record_users() + + # Request to update a device display name with a new value that is longer than allowed. + update = { + "display_name": "a" + * (synapse.handlers.device.MAX_DEVICE_DISPLAY_NAME_LEN + 1) + } + self.get_failure( + self.handler.update_device(user1, "abc", update), + synapse.api.errors.SynapseError, + ) + + # Ensure the display name was not updated. + res = self.get_success(self.handler.get_device(user1, "abc")) + self.assertEqual(res["display_name"], "display 2") + def test_update_unknown_device(self): update = {"display_name": "new_display"} res = self.handler.update_device("user_id", "unknown_device_id", update) diff --git a/tests/handlers/test_directory.py b/tests/handlers/test_directory.py index 91c7a17070..27b916aed4 100644 --- a/tests/handlers/test_directory.py +++ b/tests/handlers/test_directory.py @@ -18,25 +18,19 @@ from mock import Mock from twisted.internet import defer +import synapse.api.errors +from synapse.api.constants import EventTypes from synapse.config.room_directory import RoomDirectoryConfig -from synapse.handlers.directory import DirectoryHandler -from synapse.rest.client.v1 import directory, room -from synapse.types import RoomAlias +from synapse.rest.client.v1 import directory, login, room +from synapse.types import RoomAlias, create_requester from tests import unittest -from tests.utils import setup_test_homeserver -class DirectoryHandlers(object): - def __init__(self, hs): - self.directory_handler = DirectoryHandler(hs) - - -class DirectoryTestCase(unittest.TestCase): +class DirectoryTestCase(unittest.HomeserverTestCase): """ Tests the directory service. """ - @defer.inlineCallbacks - def setUp(self): + def make_homeserver(self, reactor, clock): self.mock_federation = Mock() self.mock_registry = Mock() @@ -47,14 +41,12 @@ class DirectoryTestCase(unittest.TestCase): self.mock_registry.register_query_handler = register_query_handler - hs = yield setup_test_homeserver( - self.addCleanup, + hs = self.setup_test_homeserver( http_client=None, resource_for_federation=Mock(), federation_client=self.mock_federation, federation_registry=self.mock_registry, ) - hs.handlers = DirectoryHandlers(hs) self.handler = hs.get_handlers().directory_handler @@ -64,23 +56,25 @@ class DirectoryTestCase(unittest.TestCase): self.your_room = RoomAlias.from_string("#your-room:test") self.remote_room = RoomAlias.from_string("#another:remote") - @defer.inlineCallbacks + return hs + def test_get_local_association(self): - yield self.store.create_room_alias_association( - self.my_room, "!8765qwer:test", ["test"] + self.get_success( + self.store.create_room_alias_association( + self.my_room, "!8765qwer:test", ["test"] + ) ) - result = yield self.handler.get_association(self.my_room) + result = self.get_success(self.handler.get_association(self.my_room)) self.assertEquals({"room_id": "!8765qwer:test", "servers": ["test"]}, result) - @defer.inlineCallbacks def test_get_remote_association(self): self.mock_federation.make_query.return_value = defer.succeed( {"room_id": "!8765qwer:test", "servers": ["test", "remote"]} ) - result = yield self.handler.get_association(self.remote_room) + result = self.get_success(self.handler.get_association(self.remote_room)) self.assertEquals( {"room_id": "!8765qwer:test", "servers": ["test", "remote"]}, result @@ -93,19 +87,168 @@ class DirectoryTestCase(unittest.TestCase): ignore_backoff=True, ) - @defer.inlineCallbacks + def test_delete_alias_not_allowed(self): + room_id = "!8765qwer:test" + self.get_success( + self.store.create_room_alias_association(self.my_room, room_id, ["test"]) + ) + + self.get_failure( + self.handler.delete_association( + create_requester("@user:test"), self.my_room + ), + synapse.api.errors.AuthError, + ) + + def test_delete_alias(self): + room_id = "!8765qwer:test" + user_id = "@user:test" + self.get_success( + self.store.create_room_alias_association( + self.my_room, room_id, ["test"], user_id + ) + ) + + result = self.get_success( + self.handler.delete_association(create_requester(user_id), self.my_room) + ) + self.assertEquals(room_id, result) + + # The alias should not be found. + self.get_failure( + self.handler.get_association(self.my_room), synapse.api.errors.SynapseError + ) + def test_incoming_fed_query(self): - yield self.store.create_room_alias_association( - self.your_room, "!8765asdf:test", ["test"] + self.get_success( + self.store.create_room_alias_association( + self.your_room, "!8765asdf:test", ["test"] + ) ) - response = yield self.query_handlers["directory"]( - {"room_alias": "#your-room:test"} + response = self.get_success( + self.handler.on_directory_query({"room_alias": "#your-room:test"}) ) self.assertEquals({"room_id": "!8765asdf:test", "servers": ["test"]}, response) +class CanonicalAliasTestCase(unittest.HomeserverTestCase): + """Test modifications of the canonical alias when delete aliases. + """ + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + directory.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.handler = hs.get_handlers().directory_handler + self.state_handler = hs.get_state_handler() + + # Create user + self.admin_user = self.register_user("admin", "pass", admin=True) + self.admin_user_tok = self.login("admin", "pass") + + # Create a test room + self.room_id = self.helper.create_room_as( + self.admin_user, tok=self.admin_user_tok + ) + + self.test_alias = "#test:test" + self.room_alias = RoomAlias.from_string(self.test_alias) + + # Create a new alias to this room. + self.get_success( + self.store.create_room_alias_association( + self.room_alias, self.room_id, ["test"], self.admin_user + ) + ) + + def test_remove_alias(self): + """Removing an alias that is the canonical alias should remove it there too.""" + # Set this new alias as the canonical alias for this room + self.helper.send_state( + self.room_id, + "m.room.canonical_alias", + {"alias": self.test_alias, "alt_aliases": [self.test_alias]}, + tok=self.admin_user_tok, + ) + + data = self.get_success( + self.state_handler.get_current_state( + self.room_id, EventTypes.CanonicalAlias, "" + ) + ) + self.assertEqual(data["content"]["alias"], self.test_alias) + self.assertEqual(data["content"]["alt_aliases"], [self.test_alias]) + + # Finally, delete the alias. + self.get_success( + self.handler.delete_association( + create_requester(self.admin_user), self.room_alias + ) + ) + + data = self.get_success( + self.state_handler.get_current_state( + self.room_id, EventTypes.CanonicalAlias, "" + ) + ) + self.assertNotIn("alias", data["content"]) + self.assertNotIn("alt_aliases", data["content"]) + + def test_remove_other_alias(self): + """Removing an alias listed as in alt_aliases should remove it there too.""" + # Create a second alias. + other_test_alias = "#test2:test" + other_room_alias = RoomAlias.from_string(other_test_alias) + self.get_success( + self.store.create_room_alias_association( + other_room_alias, self.room_id, ["test"], self.admin_user + ) + ) + + # Set the alias as the canonical alias for this room. + self.helper.send_state( + self.room_id, + "m.room.canonical_alias", + { + "alias": self.test_alias, + "alt_aliases": [self.test_alias, other_test_alias], + }, + tok=self.admin_user_tok, + ) + + data = self.get_success( + self.state_handler.get_current_state( + self.room_id, EventTypes.CanonicalAlias, "" + ) + ) + self.assertEqual(data["content"]["alias"], self.test_alias) + self.assertEqual( + data["content"]["alt_aliases"], [self.test_alias, other_test_alias] + ) + + # Delete the second alias. + self.get_success( + self.handler.delete_association( + create_requester(self.admin_user), other_room_alias + ) + ) + + data = self.get_success( + self.state_handler.get_current_state( + self.room_id, EventTypes.CanonicalAlias, "" + ) + ) + self.assertEqual(data["content"]["alias"], self.test_alias) + self.assertEqual(data["content"]["alt_aliases"], [self.test_alias]) + + class TestCreateAliasACL(unittest.HomeserverTestCase): user_id = "@test:test" diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index b4d92cf732..132e35651d 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -99,6 +99,7 @@ class FederationTestCase(unittest.HomeserverTestCase): user_id = self.register_user("kermit", "test") tok = self.login("kermit", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(self.store.get_room_version(room_id)) # pretend that another server has joined join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id) @@ -120,7 +121,7 @@ class FederationTestCase(unittest.HomeserverTestCase): "auth_events": [], "origin_server_ts": self.clock.time_msec(), }, - join_event.format_version, + room_version, ) with LoggingContext(request="send_rejected"): @@ -149,6 +150,7 @@ class FederationTestCase(unittest.HomeserverTestCase): user_id = self.register_user("kermit", "test") tok = self.login("kermit", "test") room_id = self.helper.create_room_as(room_creator=user_id, tok=tok) + room_version = self.get_success(self.store.get_room_version(room_id)) # pretend that another server has joined join_event = self._build_and_send_join_event(OTHER_SERVER, OTHER_USER, room_id) @@ -171,7 +173,7 @@ class FederationTestCase(unittest.HomeserverTestCase): "auth_events": [], "origin_server_ts": self.clock.time_msec(), }, - join_event.format_version, + room_version, ) with LoggingContext(request="send_rejected"): diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 68b9847bd2..2767b0497a 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -111,7 +111,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): retry_timings_res ) - self.datastore.get_device_updates_by_remote.return_value = (0, []) + self.datastore.get_device_updates_by_remote.return_value = defer.succeed( + (0, []) + ) def get_received_txn_response(*args): return defer.succeed(None) @@ -144,7 +146,9 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.datastore.get_current_state_deltas.return_value = (0, None) self.datastore.get_to_device_stream_token = lambda: 0 - self.datastore.get_new_device_msgs_for_remote = lambda *args, **kargs: ([], 0) + self.datastore.get_new_device_msgs_for_remote = lambda *args, **kargs: defer.succeed( + ([], 0) + ) self.datastore.delete_device_msgs_for_remote = lambda *args, **kargs: None self.datastore.set_received_txn_response = lambda *args, **kwargs: defer.succeed( None diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 26071059d2..0a4765fff4 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -147,6 +147,98 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): s = self.get_success(self.handler.search_users(u1, "user3", 10)) self.assertEqual(len(s["results"]), 0) + def test_spam_checker(self): + """ + A user which fails to the spam checks will not appear in search results. + """ + u1 = self.register_user("user1", "pass") + u1_token = self.login(u1, "pass") + u2 = self.register_user("user2", "pass") + u2_token = self.login(u2, "pass") + + # We do not add users to the directory until they join a room. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + room = self.helper.create_room_as(u1, is_public=False, tok=u1_token) + self.helper.invite(room, src=u1, targ=u2, tok=u1_token) + self.helper.join(room, user=u2, tok=u2_token) + + # Check we have populated the database correctly. + shares_private = self.get_users_who_share_private_rooms() + public_users = self.get_users_in_public_rooms() + + self.assertEqual( + self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + ) + self.assertEqual(public_users, []) + + # We get one search result when searching for user2 by user1. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 1) + + # Configure a spam checker that does not filter any users. + spam_checker = self.hs.get_spam_checker() + + class AllowAll(object): + def check_username_for_spam(self, user_profile): + # Allow all users. + return False + + spam_checker.spam_checker = AllowAll() + + # The results do not change: + # We get one search result when searching for user2 by user1. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 1) + + # Configure a spam checker that filters all users. + class BlockAll(object): + def check_username_for_spam(self, user_profile): + # All users are spammy. + return True + + spam_checker.spam_checker = BlockAll() + + # User1 now gets no search results for any of the other users. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + def test_legacy_spam_checker(self): + """ + A spam checker without the expected method should be ignored. + """ + u1 = self.register_user("user1", "pass") + u1_token = self.login(u1, "pass") + u2 = self.register_user("user2", "pass") + u2_token = self.login(u2, "pass") + + # We do not add users to the directory until they join a room. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 0) + + room = self.helper.create_room_as(u1, is_public=False, tok=u1_token) + self.helper.invite(room, src=u1, targ=u2, tok=u1_token) + self.helper.join(room, user=u2, tok=u2_token) + + # Check we have populated the database correctly. + shares_private = self.get_users_who_share_private_rooms() + public_users = self.get_users_in_public_rooms() + + self.assertEqual( + self._compress_shared(shares_private), set([(u1, u2, room), (u2, u1, room)]) + ) + self.assertEqual(public_users, []) + + # Configure a spam checker. + spam_checker = self.hs.get_spam_checker() + # The spam checker doesn't need any methods, so create a bare object. + spam_checker.spam_checker = object() + + # We get one search result when searching for user2 by user1. + s = self.get_success(self.handler.search_users(u1, "user2", 10)) + self.assertEqual(len(s["results"]), 1) + def _compress_shared(self, shared): """ Compress a list of users who share rooms dicts to a list of tuples. diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index b1b037006d..d31210fbe4 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -15,7 +15,7 @@ import logging from canonicaljson import encode_canonical_json -from synapse.events import FrozenEvent, _EventInternalMetadata +from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict from synapse.events.snapshot import EventContext from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore @@ -90,7 +90,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): msg_dict["content"] = {} msg_dict["unsigned"]["redacted_by"] = redaction.event_id msg_dict["unsigned"]["redacted_because"] = redaction - redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + redacted = make_event_from_dict( + msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict() + ) self.check("get_event", [msg.event_id], redacted) def test_backfilled_redactions(self): @@ -110,7 +112,9 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): msg_dict["content"] = {} msg_dict["unsigned"]["redacted_by"] = redaction.event_id msg_dict["unsigned"]["redacted_because"] = redaction - redacted = FrozenEvent(msg_dict, msg.internal_metadata.get_dict()) + redacted = make_event_from_dict( + msg_dict, internal_metadata_dict=msg.internal_metadata.get_dict() + ) self.check("get_event", [msg.event_id], redacted) def test_invites(self): @@ -345,7 +349,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): if redacts is not None: event_dict["redacts"] = redacts - event = FrozenEvent(event_dict, internal_metadata_dict=internal) + event = make_event_from_dict(event_dict, internal_metadata_dict=internal) self.event_id += 1 diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 8f09f51c61..490ce8f55d 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -401,13 +401,35 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(403, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("You are not a server admin", channel.json_body["error"]) + def test_user_does_not_exist(self): + """ + Tests that a lookup for a user that does not exist returns a 404 + """ + self.hs.config.registration_shared_secret = None + + request, channel = self.make_request( + "GET", + "/_synapse/admin/v2/users/@unknown_person:test", + access_token=self.admin_user_tok, + ) + self.render(request) + + self.assertEqual(404, channel.code, msg=channel.json_body) + self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"]) + def test_requester_is_admin(self): """ If the user is a server admin, a new user is created. """ self.hs.config.registration_shared_secret = None - body = json.dumps({"password": "abc123", "admin": True}) + body = json.dumps( + { + "password": "abc123", + "admin": True, + "threepids": [{"medium": "email", "address": "bob@bob.bob"}], + } + ) # Create user request, channel = self.make_request( @@ -421,6 +443,8 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) self.assertEqual("bob", channel.json_body["displayname"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"]) # Get user request, channel = self.make_request( @@ -449,7 +473,13 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) # Modify user - body = json.dumps({"displayname": "foobar", "deactivated": True}) + body = json.dumps( + { + "displayname": "foobar", + "deactivated": True, + "threepids": [{"medium": "email", "address": "bob2@bob.bob"}], + } + ) request, channel = self.make_request( "PUT", @@ -463,6 +493,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("@bob:test", channel.json_body["name"]) self.assertEqual("foobar", channel.json_body["displayname"]) self.assertEqual(True, channel.json_body["deactivated"]) + # the user is deactivated, the threepid will be deleted # Get user request, channel = self.make_request( diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index e3af280ba6..fb681a1db9 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -1612,7 +1612,9 @@ class ContextTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, homeserver): self.user_id = self.register_user("user", "password") self.tok = self.login("user", "password") - self.room_id = self.helper.create_room_as(self.user_id, tok=self.tok) + self.room_id = self.helper.create_room_as( + self.user_id, tok=self.tok, is_public=False + ) self.other_user_id = self.register_user("user2", "password") self.other_tok = self.login("user2", "password") diff --git a/tests/state/test_v2.py b/tests/state/test_v2.py index 0f341d3ac3..5bafad9f19 100644 --- a/tests/state/test_v2.py +++ b/tests/state/test_v2.py @@ -22,7 +22,7 @@ import attr from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.api.room_versions import RoomVersions from synapse.event_auth import auth_types_for_event -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.state.v2 import lexicographical_topological_sort, resolve_events_with_store from synapse.types import EventID @@ -89,7 +89,7 @@ class FakeEvent(object): if self.state_key is not None: event_dict["state_key"] = self.state_key - return FrozenEvent(event_dict) + return make_event_from_dict(event_dict) # All graphs start with this set of events diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py index feb1c07cb2..b9ee6ec1ec 100644 --- a/tests/storage/test_redaction.py +++ b/tests/storage/test_redaction.py @@ -238,8 +238,11 @@ class RedactionTestCase(unittest.HomeserverTestCase): @defer.inlineCallbacks def build(self, prev_event_ids): built_event = yield self._base_builder.build(prev_event_ids) - built_event.event_id = self._event_id + + built_event._event_id = self._event_id built_event._event_dict["event_id"] = self._event_id + assert built_event.event_id == self._event_id + return built_event @property diff --git a/tests/test_event_auth.py b/tests/test_event_auth.py index ca20b085a2..bfa5d6f510 100644 --- a/tests/test_event_auth.py +++ b/tests/test_event_auth.py @@ -18,7 +18,7 @@ import unittest from synapse import event_auth from synapse.api.errors import AuthError from synapse.api.room_versions import RoomVersions -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict class EventAuthTestCase(unittest.TestCase): @@ -94,7 +94,7 @@ TEST_ROOM_ID = "!test:room" def _create_event(user_id): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -106,7 +106,7 @@ def _create_event(user_id): def _join_event(user_id): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -119,7 +119,7 @@ def _join_event(user_id): def _power_levels_event(sender, content): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), @@ -132,7 +132,7 @@ def _power_levels_event(sender, content): def _random_state_event(sender): - return FrozenEvent( + return make_event_from_dict( { "room_id": TEST_ROOM_ID, "event_id": _get_event_id(), diff --git a/tests/test_federation.py b/tests/test_federation.py index 68684460c6..9b5cf562f3 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -2,7 +2,7 @@ from mock import Mock from twisted.internet.defer import ensureDeferred, maybeDeferred, succeed -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock @@ -43,7 +43,7 @@ class MessageAcceptTests(unittest.TestCase): ) )[0] - join_event = FrozenEvent( + join_event = make_event_from_dict( { "room_id": self.room_id, "sender": "@baduser:test.serv", @@ -105,7 +105,7 @@ class MessageAcceptTests(unittest.TestCase): )[0] # Now lie about an event - lying_event = FrozenEvent( + lying_event = make_event_from_dict( { "room_id": self.room_id, "sender": "@baduser:test.serv", diff --git a/tests/test_state.py b/tests/test_state.py index 1e4449fa1c..d1578fe581 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -20,7 +20,7 @@ from twisted.internet import defer from synapse.api.auth import Auth from synapse.api.constants import EventTypes, Membership from synapse.api.room_versions import RoomVersions -from synapse.events import FrozenEvent +from synapse.events import make_event_from_dict from synapse.events.snapshot import EventContext from synapse.state import StateHandler, StateResolutionHandler @@ -66,7 +66,7 @@ def create_event( d.update(kwargs) - event = FrozenEvent(d) + event = make_event_from_dict(d) return event diff --git a/tox.ini b/tox.ini index 88ef12bebd..b9132a3177 100644 --- a/tox.ini +++ b/tox.ini @@ -179,7 +179,10 @@ extras = all commands = mypy \ synapse/api \ synapse/config/ \ + synapse/events/spamcheck.py \ + synapse/federation/sender \ synapse/federation/transport \ + synapse/handlers/sync.py \ synapse/handlers/ui_auth \ synapse/logging/ \ synapse/module_api \ |