diff options
139 files changed, 1784 insertions, 1214 deletions
diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md index 6c3a998499..0dfab4e087 100644 --- a/.github/PULL_REQUEST_TEMPLATE.md +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -8,6 +8,7 @@ - Use markdown where necessary, mostly for `code blocks`. - End with either a period (.) or an exclamation mark (!). - Start with a capital letter. + - Feel free to credit yourself, by adding a sentence "Contributed by @github_username." or "Contributed by [Your Name]." to the end of the entry. * [ ] Pull request includes a [sign off](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#sign-off) * [ ] [Code style](https://matrix-org.github.io/synapse/latest/code_style.html) is correct (run the [linters](https://matrix-org.github.io/synapse/latest/development/contributing_guide.html#run-the-linters)) diff --git a/.github/workflows/tests.yml b/.github/workflows/tests.yml index cb72e1a233..4f58069702 100644 --- a/.github/workflows/tests.yml +++ b/.github/workflows/tests.yml @@ -366,6 +366,8 @@ jobs: # Build initial Synapse image - run: docker build -t matrixdotorg/synapse:latest -f docker/Dockerfile . working-directory: synapse + env: + DOCKER_BUILDKIT: 1 # Build a ready-to-run Synapse image based on the initial image above. # This new image includes a config file, keys for signing and TLS, and @@ -374,7 +376,8 @@ jobs: working-directory: complement/dockerfiles # Run Complement - - run: go test -v -tags synapse_blacklist,msc2403 ./tests/... + - run: set -o pipefail && go test -v -json -tags synapse_blacklist,msc2403 ./tests/... 2>&1 | gotestfmt + shell: bash env: COMPLEMENT_BASE_IMAGE: complement-synapse:latest working-directory: complement diff --git a/.gitignore b/.gitignore index fe137f3370..8eb4eda73d 100644 --- a/.gitignore +++ b/.gitignore @@ -50,3 +50,7 @@ __pycache__/ # docs book/ + +# complement +/complement-master +/master.tar.gz diff --git a/changelog.d/11530.bugfix b/changelog.d/11530.bugfix new file mode 100644 index 0000000000..7ea9ba4e49 --- /dev/null +++ b/changelog.d/11530.bugfix @@ -0,0 +1,2 @@ +Fix a long-standing issue which could cause Synapse to incorrectly accept data in the unsigned field of events +received over federation. \ No newline at end of file diff --git a/changelog.d/11561.feature b/changelog.d/11561.feature new file mode 100644 index 0000000000..19dada883b --- /dev/null +++ b/changelog.d/11561.feature @@ -0,0 +1 @@ +Add `track_puppeted_user_ips` config flag to track puppeted user IP addresses. This also includes them in monthly active user counts. diff --git a/changelog.d/11576.feature b/changelog.d/11576.feature new file mode 100644 index 0000000000..5be836ae02 --- /dev/null +++ b/changelog.d/11576.feature @@ -0,0 +1 @@ +Remove the `"password_hash"` field from the response dictionaries of the [Users Admin API](https://matrix-org.github.io/synapse/latest/admin_api/user_admin_api.html). \ No newline at end of file diff --git a/changelog.d/11587.bugfix b/changelog.d/11587.bugfix new file mode 100644 index 0000000000..ad2b83edf7 --- /dev/null +++ b/changelog.d/11587.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where Synapse wouldn't cache a response indicating that a remote user has no devices. \ No newline at end of file diff --git a/changelog.d/11593.bugfix b/changelog.d/11593.bugfix new file mode 100644 index 0000000000..963fd0e58e --- /dev/null +++ b/changelog.d/11593.bugfix @@ -0,0 +1 @@ +Fix an error in to get federation status of a destination server even if no error has occurred. This admin API was new introduced in Synapse 1.49.0. diff --git a/changelog.d/11612.misc b/changelog.d/11612.misc new file mode 100644 index 0000000000..2d886169c5 --- /dev/null +++ b/changelog.d/11612.misc @@ -0,0 +1 @@ +Avoid database access in the JSON serialization process. diff --git a/changelog.d/11659.bugfix b/changelog.d/11659.bugfix new file mode 100644 index 0000000000..842f6892fd --- /dev/null +++ b/changelog.d/11659.bugfix @@ -0,0 +1 @@ +Include the bundled aggregations in the `/sync` response, per [MSC2675](https://github.com/matrix-org/matrix-doc/pull/2675). diff --git a/changelog.d/11667.bugfix b/changelog.d/11667.bugfix new file mode 100644 index 0000000000..bf65fd4c8b --- /dev/null +++ b/changelog.d/11667.bugfix @@ -0,0 +1 @@ +Fix `/_matrix/client/v1/room/{roomId}/hierarchy` endpoint returning incorrect fields which have been present since Synapse 1.49.0. diff --git a/changelog.d/11672.feature b/changelog.d/11672.feature new file mode 100644 index 0000000000..ce8b3e9547 --- /dev/null +++ b/changelog.d/11672.feature @@ -0,0 +1 @@ +Return an `M_FORBIDDEN` error code instead of `M_UNKNOWN` when a spam checker module prevents a user from creating a room. diff --git a/changelog.d/11682.removal b/changelog.d/11682.removal new file mode 100644 index 0000000000..50bdf35b20 --- /dev/null +++ b/changelog.d/11682.removal @@ -0,0 +1 @@ +Remove the unstable `/send_relation` endpoint. diff --git a/changelog.d/11685.misc b/changelog.d/11685.misc new file mode 100644 index 0000000000..c4566b2012 --- /dev/null +++ b/changelog.d/11685.misc @@ -0,0 +1 @@ +Run `pyupgrade --py37-plus --keep-percent-format` on Synapse. diff --git a/changelog.d/11691.misc b/changelog.d/11691.misc new file mode 100644 index 0000000000..383d0b3064 --- /dev/null +++ b/changelog.d/11691.misc @@ -0,0 +1 @@ +Use buildkit's cache feature to speed up docker builds. diff --git a/changelog.d/11692.misc b/changelog.d/11692.misc new file mode 100644 index 0000000000..0cdfca54e7 --- /dev/null +++ b/changelog.d/11692.misc @@ -0,0 +1 @@ +Use `auto_attribs` and native type hints for attrs classes. diff --git a/changelog.d/11693.misc b/changelog.d/11693.misc new file mode 100644 index 0000000000..521a1796b8 --- /dev/null +++ b/changelog.d/11693.misc @@ -0,0 +1 @@ +Remove debug logging for #4422, which has been closed since Synapse 0.99. \ No newline at end of file diff --git a/changelog.d/11695.bugfix b/changelog.d/11695.bugfix new file mode 100644 index 0000000000..7799aefb82 --- /dev/null +++ b/changelog.d/11695.bugfix @@ -0,0 +1 @@ +Fix a bug where the only the first 50 rooms from a space were returned from the `/hierarchy` API. This has existed since the introduction of the API in Synapse v1.41.0. diff --git a/changelog.d/11699.misc b/changelog.d/11699.misc new file mode 100644 index 0000000000..ffae5f2960 --- /dev/null +++ b/changelog.d/11699.misc @@ -0,0 +1 @@ +Remove fallback code for Python 2. diff --git a/changelog.d/11701.misc b/changelog.d/11701.misc new file mode 100644 index 0000000000..68905e0412 --- /dev/null +++ b/changelog.d/11701.misc @@ -0,0 +1 @@ +Add a test for [an edge case](https://github.com/matrix-org/synapse/pull/11532#discussion_r769104461) in the `/sync` logic. \ No newline at end of file diff --git a/changelog.d/11702.misc b/changelog.d/11702.misc new file mode 100644 index 0000000000..fc1069cae0 --- /dev/null +++ b/changelog.d/11702.misc @@ -0,0 +1 @@ +Add the option to write sqlite test dbs to disk when running tests. \ No newline at end of file diff --git a/changelog.d/11707.misc b/changelog.d/11707.misc new file mode 100644 index 0000000000..ef1e01cac8 --- /dev/null +++ b/changelog.d/11707.misc @@ -0,0 +1 @@ +Improve Complement test output for Gitub Actions. diff --git a/changelog.d/11710.bugfix b/changelog.d/11710.bugfix new file mode 100644 index 0000000000..6521a37f6e --- /dev/null +++ b/changelog.d/11710.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.18.0 where password reset and address validation emails would not be sent if their subject was configured to use the 'app' template variable. Contributed by @br4nnigan. diff --git a/changelog.d/11714.misc b/changelog.d/11714.misc new file mode 100644 index 0000000000..7f39bf0e3d --- /dev/null +++ b/changelog.d/11714.misc @@ -0,0 +1 @@ +Fix a typechecker problem related to our (ab)use of `nacl.signing.SigningKey`s. \ No newline at end of file diff --git a/changelog.d/11715.doc b/changelog.d/11715.doc new file mode 100644 index 0000000000..32b7c10b0b --- /dev/null +++ b/changelog.d/11715.doc @@ -0,0 +1 @@ +Document the new `SYNAPSE_TEST_PERSIST_SQLITE_DB` environment variable in the contributing guide. diff --git a/changelog.d/11716.misc b/changelog.d/11716.misc new file mode 100644 index 0000000000..08f7310498 --- /dev/null +++ b/changelog.d/11716.misc @@ -0,0 +1 @@ +Fix docstring on `add_account_data_for_user`. \ No newline at end of file diff --git a/changelog.d/11718.misc b/changelog.d/11718.misc new file mode 100644 index 0000000000..91dc5b5874 --- /dev/null +++ b/changelog.d/11718.misc @@ -0,0 +1 @@ +Complement environment variable name change and update `.gitignore`. diff --git a/changelog.d/11723.misc b/changelog.d/11723.misc new file mode 100644 index 0000000000..f99e02070a --- /dev/null +++ b/changelog.d/11723.misc @@ -0,0 +1 @@ +Simplify calculation of prometheus metrics for garbage collection. diff --git a/changelog.d/11725.doc b/changelog.d/11725.doc new file mode 100644 index 0000000000..46eb9b814f --- /dev/null +++ b/changelog.d/11725.doc @@ -0,0 +1 @@ +Document that now the minimum supported PostgreSQL version is 10. diff --git a/changelog.d/11735.doc b/changelog.d/11735.doc new file mode 100644 index 0000000000..d8822f6b52 --- /dev/null +++ b/changelog.d/11735.doc @@ -0,0 +1 @@ +Fix typo in demo docs: differnt. diff --git a/changelog.d/11739.doc b/changelog.d/11739.doc new file mode 100644 index 0000000000..3d64f473f5 --- /dev/null +++ b/changelog.d/11739.doc @@ -0,0 +1 @@ +Update room spec url in config files. \ No newline at end of file diff --git a/changelog.d/11740.doc b/changelog.d/11740.doc new file mode 100644 index 0000000000..dce080a5e9 --- /dev/null +++ b/changelog.d/11740.doc @@ -0,0 +1 @@ +Mention python3-venv and libpq-dev dependencies in contribution guide. diff --git a/changelog.d/11742.misc b/changelog.d/11742.misc new file mode 100644 index 0000000000..f65ccdf30a --- /dev/null +++ b/changelog.d/11742.misc @@ -0,0 +1 @@ +Minor efficiency improvements when inserting many values into the database. diff --git a/changelog.d/11744.misc b/changelog.d/11744.misc new file mode 100644 index 0000000000..b7df14657a --- /dev/null +++ b/changelog.d/11744.misc @@ -0,0 +1 @@ +Invite PR authors to give themselves credit in the changelog. diff --git a/changelog.d/11745.bugfix b/changelog.d/11745.bugfix new file mode 100644 index 0000000000..6521a37f6e --- /dev/null +++ b/changelog.d/11745.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.18.0 where password reset and address validation emails would not be sent if their subject was configured to use the 'app' template variable. Contributed by @br4nnigan. diff --git a/changelog.d/11749.feature b/changelog.d/11749.feature new file mode 100644 index 0000000000..19dada883b --- /dev/null +++ b/changelog.d/11749.feature @@ -0,0 +1 @@ +Add `track_puppeted_user_ips` config flag to track puppeted user IP addresses. This also includes them in monthly active user counts. diff --git a/demo/README b/demo/README index 0bec820ad6..a5a95bd196 100644 --- a/demo/README +++ b/demo/README @@ -22,5 +22,5 @@ Logs and sqlitedb will be stored in demo/808{0,1,2}.{log,db} -Also note that when joining a public room on a differnt HS via "#foo:bar.net", then you are (in the current impl) joining a room with room_id "foo". This means that it won't work if your HS already has a room with that name. +Also note that when joining a public room on a different HS via "#foo:bar.net", then you are (in the current impl) joining a room with room_id "foo". This means that it won't work if your HS already has a room with that name. diff --git a/docker/Dockerfile b/docker/Dockerfile index 2bdc607e66..306f75ae56 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -1,14 +1,17 @@ # Dockerfile to build the matrixdotorg/synapse docker images. # +# Note that it uses features which are only available in BuildKit - see +# https://docs.docker.com/go/buildkit/ for more information. +# # To build the image, run `docker build` command from the root of the # synapse repository: # -# docker build -f docker/Dockerfile . +# DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile . # # There is an optional PYTHON_VERSION build argument which sets the # version of python to build against: for example: # -# docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.6 . +# DOCKER_BUILDKIT=1 docker build -f docker/Dockerfile --build-arg PYTHON_VERSION=3.9 . # ARG PYTHON_VERSION=3.8 @@ -19,7 +22,16 @@ ARG PYTHON_VERSION=3.8 FROM docker.io/python:${PYTHON_VERSION}-slim as builder # install the OS build deps -RUN apt-get update && apt-get install -y \ +# +# RUN --mount is specific to buildkit and is documented at +# https://github.com/moby/buildkit/blob/master/frontend/dockerfile/docs/syntax.md#build-mounts-run---mount. +# Here we use it to set up a cache for apt, to improve rebuild speeds on +# slow connections. +# +RUN \ + --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt-get update && apt-get install -y \ build-essential \ libffi-dev \ libjpeg-dev \ @@ -44,7 +56,8 @@ COPY synapse/python_dependencies.py /synapse/synapse/python_dependencies.py # used while you develop on the source # # This is aiming at installing the `install_requires` and `extras_require` from `setup.py` -RUN pip install --prefix="/install" --no-warn-script-location \ +RUN --mount=type=cache,target=/root/.cache/pip \ + pip install --prefix="/install" --no-warn-script-location \ /synapse[all] # Copy over the rest of the project @@ -66,7 +79,10 @@ LABEL org.opencontainers.image.documentation='https://github.com/matrix-org/syna LABEL org.opencontainers.image.source='https://github.com/matrix-org/synapse.git' LABEL org.opencontainers.image.licenses='Apache-2.0' -RUN apt-get update && apt-get install -y \ +RUN \ + --mount=type=cache,target=/var/cache/apt,sharing=locked \ + --mount=type=cache,target=/var/lib/apt,sharing=locked \ + apt-get update && apt-get install -y \ curl \ gosu \ libjpeg62-turbo \ diff --git a/docs/admin_api/user_admin_api.md b/docs/admin_api/user_admin_api.md index 74933d2fcf..c514cadb9d 100644 --- a/docs/admin_api/user_admin_api.md +++ b/docs/admin_api/user_admin_api.md @@ -15,9 +15,10 @@ server admin: [Admin API](../usage/administration/admin_api) It returns a JSON body like the following: -```json +```jsonc { - "displayname": "User", + "name": "@user:example.com", + "displayname": "User", // can be null if not set "threepids": [ { "medium": "email", @@ -32,11 +33,11 @@ It returns a JSON body like the following: "validated_at": 1586458409743 } ], - "avatar_url": "<avatar_url>", + "avatar_url": "<avatar_url>", // can be null if not set + "is_guest": 0, "admin": 0, "deactivated": 0, "shadow_banned": 0, - "password_hash": "$2b$12$p9B4GkqYdRTPGD", "creation_ts": 1560432506, "appservice_id": null, "consent_server_notice_sent": null, diff --git a/docs/development/contributing_guide.md b/docs/development/contributing_guide.md index abdb808438..c142981693 100644 --- a/docs/development/contributing_guide.md +++ b/docs/development/contributing_guide.md @@ -20,7 +20,9 @@ recommended for development. More information about WSL can be found at <https://docs.microsoft.com/en-us/windows/wsl/install>. Running Synapse natively on Windows is not officially supported. -The code of Synapse is written in Python 3. To do pretty much anything, you'll need [a recent version of Python 3](https://wiki.python.org/moin/BeginnersGuide/Download). +The code of Synapse is written in Python 3. To do pretty much anything, you'll need [a recent version of Python 3](https://www.python.org/downloads/). Your Python also needs support for [virtual environments](https://docs.python.org/3/library/venv.html). This is usually built-in, but some Linux distributions like Debian and Ubuntu split it out into its own package. Running `sudo apt install python3-venv` should be enough. + +Synapse can connect to PostgreSQL via the [psycopg2](https://pypi.org/project/psycopg2/) Python library. Building this library from source requires access to PostgreSQL's C header files. On Debian or Ubuntu Linux, these can be installed with `sudo apt install libpq-dev`. The source code of Synapse is hosted on GitHub. You will also need [a recent version of git](https://github.com/git-guides/install-git). @@ -169,6 +171,27 @@ To increase the log level for the tests, set `SYNAPSE_TEST_LOG_LEVEL`: SYNAPSE_TEST_LOG_LEVEL=DEBUG trial tests ``` +By default, tests will use an in-memory SQLite database for test data. For additional +help with debugging, one can use an on-disk SQLite database file instead, in order to +review database state during and after running tests. This can be done by setting +the `SYNAPSE_TEST_PERSIST_SQLITE_DB` environment variable. Doing so will cause the +database state to be stored in a file named `test.db` under the trial process' +working directory. Typically, this ends up being `_trial_temp/test.db`. For example: + +```sh +SYNAPSE_TEST_PERSIST_SQLITE_DB=1 trial tests +``` + +The database file can then be inspected with: + +```sh +sqlite3 _trial_temp/test.db +``` + +Note that the database file is cleared at the beginning of each test run. Thus it +will always only contain the data generated by the *last run test*. Though generally +when debugging, one is only running a single test anyway. + ### Running tests under PostgreSQL Invoking `trial` as above will use an in-memory SQLite database. This is great for diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 810a14b077..9a501167ee 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -164,7 +164,7 @@ presence: # The default room version for newly created rooms. # # Known room versions are listed here: -# https://matrix.org/docs/spec/#complete-list-of-room-versions +# https://spec.matrix.org/latest/rooms/#complete-list-of-room-versions # # For example, for room version 1, default_room_version should be set # to "1". @@ -1503,6 +1503,13 @@ room_prejoin_state: #additional_event_types: # - org.example.custom.event.type +# By default when puppeting another user, the user who has created the +# access token for puppeting is tracked. If this is enabled, both +# requests are tracked. Implicitly enables MAU tracking for puppeted users. +# Uncomment to also track puppeted user IP's. +# +#track_puppeted_user_ips: true + # A list of application service config files to use # diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh index 53295b58fc..67a22d3ed3 100755 --- a/scripts-dev/complement.sh +++ b/scripts-dev/complement.sh @@ -23,6 +23,9 @@ # Exit if a line returns a non-zero exit code set -e +# enable buildkit for the docker builds +export DOCKER_BUILDKIT=1 + # Change to the repository root cd "$(dirname $0)/.." @@ -47,7 +50,7 @@ if [[ -n "$WORKERS" ]]; then COMPLEMENT_DOCKERFILE=SynapseWorkers.Dockerfile # And provide some more configuration to complement. export COMPLEMENT_CA=true - export COMPLEMENT_VERSION_CHECK_ITERATIONS=500 + export COMPLEMENT_SPAWN_HS_TIMEOUT_SECS=25 else export COMPLEMENT_BASE_IMAGE=complement-synapse COMPLEMENT_DOCKERFILE=Synapse.Dockerfile @@ -65,4 +68,5 @@ if [[ -n "$1" ]]; then fi # Run the tests! +echo "Images built; running complement" go test -v -tags synapse_blacklist,msc2403 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests/... diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 4a32d430bd..683241201c 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -71,6 +71,7 @@ class Auth: self._auth_blocking = AuthBlocking(self.hs) self._track_appservice_user_ips = hs.config.appservice.track_appservice_user_ips + self._track_puppeted_user_ips = hs.config.api.track_puppeted_user_ips self._macaroon_secret_key = hs.config.key.macaroon_secret_key self._force_tracing_for_users = hs.config.tracing.force_tracing_for_users @@ -246,6 +247,18 @@ class Auth: user_agent=user_agent, device_id=device_id, ) + # Track also the puppeted user client IP if enabled and the user is puppeting + if ( + user_info.user_id != user_info.token_owner + and self._track_puppeted_user_ips + ): + await self.store.insert_client_ip( + user_id=user_info.user_id, + access_token=access_token, + ip=ip_addr, + user_agent=user_agent, + device_id=device_id, + ) if is_guest and not allow_guest: raise AuthError( diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py index 0a895bba48..a747a40814 100644 --- a/synapse/api/room_versions.py +++ b/synapse/api/room_versions.py @@ -46,41 +46,41 @@ class RoomDisposition: UNSTABLE = "unstable" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class RoomVersion: """An object which describes the unique attributes of a room version.""" - identifier = attr.ib(type=str) # the identifier for this version - disposition = attr.ib(type=str) # one of the RoomDispositions - event_format = attr.ib(type=int) # one of the EventFormatVersions - state_res = attr.ib(type=int) # one of the StateResolutionVersions - enforce_key_validity = attr.ib(type=bool) + identifier: str # the identifier for this version + disposition: str # one of the RoomDispositions + event_format: int # one of the EventFormatVersions + state_res: int # one of the StateResolutionVersions + enforce_key_validity: bool # Before MSC2432, m.room.aliases had special auth rules and redaction rules - special_case_aliases_auth = attr.ib(type=bool) + special_case_aliases_auth: bool # Strictly enforce canonicaljson, do not allow: # * Integers outside the range of [-2 ^ 53 + 1, 2 ^ 53 - 1] # * Floats # * NaN, Infinity, -Infinity - strict_canonicaljson = attr.ib(type=bool) + strict_canonicaljson: bool # MSC2209: Check 'notifications' key while verifying # m.room.power_levels auth rules. - limit_notifications_power_levels = attr.ib(type=bool) + limit_notifications_power_levels: bool # MSC2174/MSC2176: Apply updated redaction rules algorithm. - msc2176_redaction_rules = attr.ib(type=bool) + msc2176_redaction_rules: bool # MSC3083: Support the 'restricted' join_rule. - msc3083_join_rules = attr.ib(type=bool) + msc3083_join_rules: bool # MSC3375: Support for the proper redaction rules for MSC3083. This mustn't # be enabled if MSC3083 is not. - msc3375_redaction_rules = attr.ib(type=bool) + msc3375_redaction_rules: bool # MSC2403: Allows join_rules to be set to 'knock', changes auth rules to allow sending # m.room.membership event with membership 'knock'. - msc2403_knocking = attr.ib(type=bool) + msc2403_knocking: bool # MSC2716: Adds m.room.power_levels -> content.historical field to control # whether "insertion", "chunk", "marker" events can be sent - msc2716_historical = attr.ib(type=bool) + msc2716_historical: bool # MSC2716: Adds support for redacting "insertion", "chunk", and "marker" events - msc2716_redactions = attr.ib(type=bool) + msc2716_redactions: bool class RoomVersions: diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 5fc59c1be1..579adbbca0 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -60,7 +60,7 @@ from synapse.events.spamcheck import load_legacy_spam_checkers from synapse.events.third_party_rules import load_legacy_third_party_event_rules from synapse.handlers.auth import load_legacy_password_auth_providers from synapse.logging.context import PreserveLoggingContext -from synapse.metrics import register_threadpool +from synapse.metrics import install_gc_manager, register_threadpool from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.metrics.jemalloc import setup_jemalloc_stats from synapse.types import ISynapseReactor @@ -159,6 +159,7 @@ def start_reactor( change_resource_limit(soft_file_limit) if gc_thresholds: gc.set_threshold(*gc_thresholds) + install_gc_manager() run_command() # make sure that we run the reactor with the sentinel log context, diff --git a/synapse/config/api.py b/synapse/config/api.py index 25538b82d5..f8e52150a2 100644 --- a/synapse/config/api.py +++ b/synapse/config/api.py @@ -29,6 +29,7 @@ class ApiConfig(Config): def read_config(self, config: JsonDict, **kwargs): validate_config(_MAIN_SCHEMA, config, ()) self.room_prejoin_state = list(self._get_prejoin_state_types(config)) + self.track_puppeted_user_ips = config.get("track_puppeted_user_ips", False) def generate_config_section(cls, **kwargs) -> str: formatted_default_state_types = "\n".join( @@ -59,6 +60,13 @@ class ApiConfig(Config): # #additional_event_types: # - org.example.custom.event.type + + # By default when puppeting another user, the user who has created the + # access token for puppeting is tracked. If this is enabled, both + # requests are tracked. Implicitly enables MAU tracking for puppeted users. + # Uncomment to also track puppeted user IP's. + # + #track_puppeted_user_ips: true """ % { "formatted_default_state_types": formatted_default_state_types } @@ -138,5 +146,8 @@ _MAIN_SCHEMA = { "properties": { "room_prejoin_state": _ROOM_PREJOIN_STATE_CONFIG_SCHEMA, "room_invite_state_types": _ROOM_INVITE_STATE_TYPES_SCHEMA, + "track_puppeted_user_ips": { + "type": "boolean", + }, }, } diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index 510b647c63..949d7dd5ac 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -55,19 +55,19 @@ https://matrix-org.github.io/synapse/latest/templates.html ---------------------------------------------------------------------------------------""" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EmailSubjectConfig: - message_from_person_in_room = attr.ib(type=str) - message_from_person = attr.ib(type=str) - messages_from_person = attr.ib(type=str) - messages_in_room = attr.ib(type=str) - messages_in_room_and_others = attr.ib(type=str) - messages_from_person_and_others = attr.ib(type=str) - invite_from_person = attr.ib(type=str) - invite_from_person_to_room = attr.ib(type=str) - invite_from_person_to_space = attr.ib(type=str) - password_reset = attr.ib(type=str) - email_validation = attr.ib(type=str) + message_from_person_in_room: str + message_from_person: str + messages_from_person: str + messages_in_room: str + messages_in_room_and_others: str + messages_from_person_and_others: str + invite_from_person: str + invite_from_person_to_room: str + invite_from_person_to_space: str + password_reset: str + email_validation: str class EmailConfig(Config): diff --git a/synapse/config/server.py b/synapse/config/server.py index 1de2dea9b0..5010266b69 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -200,8 +200,8 @@ class HttpListenerConfig: """Object describing the http-specific parts of the config of a listener""" x_forwarded: bool = False - resources: List[HttpResourceConfig] = attr.ib(factory=list) - additional_resources: Dict[str, dict] = attr.ib(factory=dict) + resources: List[HttpResourceConfig] = attr.Factory(list) + additional_resources: Dict[str, dict] = attr.Factory(dict) tag: Optional[str] = None @@ -883,7 +883,7 @@ class ServerConfig(Config): # The default room version for newly created rooms. # # Known room versions are listed here: - # https://matrix.org/docs/spec/#complete-list-of-room-versions + # https://spec.matrix.org/latest/rooms/#complete-list-of-room-versions # # For example, for room version 1, default_room_version should be set # to "1". diff --git a/synapse/config/workers.py b/synapse/config/workers.py index 576f519188..bdaba6db37 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -51,12 +51,12 @@ def _instance_to_list_converter(obj: Union[str, List[str]]) -> List[str]: return obj -@attr.s +@attr.s(auto_attribs=True) class InstanceLocationConfig: """The host and port to talk to an instance via HTTP replication.""" - host = attr.ib(type=str) - port = attr.ib(type=int) + host: str + port: int @attr.s @@ -77,34 +77,28 @@ class WriterLocations: can only be a single instance. """ - events = attr.ib( + events: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - typing = attr.ib( + typing: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - to_device = attr.ib( + to_device: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - account_data = attr.ib( + account_data: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - receipts = attr.ib( + receipts: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) - presence = attr.ib( + presence: List[str] = attr.ib( default=["master"], - type=List[str], converter=_instance_to_list_converter, ) diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 993b04099e..72d4a69aac 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -58,7 +58,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -@attr.s(slots=True, cmp=False) +@attr.s(slots=True, frozen=True, cmp=False, auto_attribs=True) class VerifyJsonRequest: """ A request to verify a JSON object. @@ -78,10 +78,10 @@ class VerifyJsonRequest: key_ids: The set of key_ids to that could be used to verify the JSON object """ - server_name = attr.ib(type=str) - get_json_object = attr.ib(type=Callable[[], JsonDict]) - minimum_valid_until_ts = attr.ib(type=int) - key_ids = attr.ib(type=List[str]) + server_name: str + get_json_object: Callable[[], JsonDict] + minimum_valid_until_ts: int + key_ids: List[str] @staticmethod def from_json_object( @@ -124,7 +124,7 @@ class KeyLookupError(ValueError): pass -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _FetchKeyRequest: """A request for keys for a given server. @@ -138,9 +138,9 @@ class _FetchKeyRequest: key_ids: The IDs of the keys to attempt to fetch """ - server_name = attr.ib(type=str) - minimum_valid_until_ts = attr.ib(type=int) - key_ids = attr.ib(type=List[str]) + server_name: str + minimum_valid_until_ts: int + key_ids: List[str] class Keyring: diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f251402ed8..0eab1aefd6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -28,7 +28,7 @@ if TYPE_CHECKING: from synapse.storage.databases.main import DataStore -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class EventContext: """ Holds information relevant to persisting an event @@ -103,15 +103,15 @@ class EventContext: accessed via get_prev_state_ids. """ - rejected = attr.ib(default=False, type=Union[bool, str]) - _state_group = attr.ib(default=None, type=Optional[int]) - state_group_before_event = attr.ib(default=None, type=Optional[int]) - prev_group = attr.ib(default=None, type=Optional[int]) - delta_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - app_service = attr.ib(default=None, type=Optional[ApplicationService]) + rejected: Union[bool, str] = False + _state_group: Optional[int] = None + state_group_before_event: Optional[int] = None + prev_group: Optional[int] = None + delta_ids: Optional[StateMap[str]] = None + app_service: Optional[ApplicationService] = None - _current_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) - _prev_state_ids = attr.ib(default=None, type=Optional[StateMap[str]]) + _current_state_ids: Optional[StateMap[str]] = None + _prev_state_ids: Optional[StateMap[str]] = None @staticmethod def with_state( diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 2038e72924..de0e0c1731 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -14,17 +14,7 @@ # limitations under the License. import collections.abc import re -from typing import ( - TYPE_CHECKING, - Any, - Callable, - Dict, - Iterable, - List, - Mapping, - Optional, - Union, -) +from typing import Any, Callable, Dict, Iterable, List, Mapping, Optional, Union from frozendict import frozendict @@ -32,14 +22,10 @@ from synapse.api.constants import EventContentFields, EventTypes, RelationTypes from synapse.api.errors import Codes, SynapseError from synapse.api.room_versions import RoomVersion from synapse.types import JsonDict -from synapse.util.async_helpers import yieldable_gather_results from synapse.util.frozenutils import unfreeze from . import EventBase -if TYPE_CHECKING: - from synapse.server import HomeServer - # Split strings on "." but not "\." This uses a negative lookbehind assertion for '\' # (?<!stuff) matches if the current position in the string is not preceded # by a match for 'stuff'. @@ -385,17 +371,12 @@ class EventClientSerializer: clients. """ - def __init__(self, hs: "HomeServer"): - self.store = hs.get_datastore() - self._msc1849_enabled = hs.config.experimental.msc1849_enabled - self._msc3440_enabled = hs.config.experimental.msc3440_enabled - - async def serialize_event( + def serialize_event( self, event: Union[JsonDict, EventBase], time_now: int, *, - bundle_aggregations: bool = False, + bundle_aggregations: Optional[Dict[str, JsonDict]] = None, **kwargs: Any, ) -> JsonDict: """Serializes a single event. @@ -418,66 +399,41 @@ class EventClientSerializer: serialized_event = serialize_event(event, time_now, **kwargs) # Check if there are any bundled aggregations to include with the event. - # - # Do not bundle aggregations if any of the following at true: - # - # * Support is disabled via the configuration or the caller. - # * The event is a state event. - # * The event has been redacted. - if ( - self._msc1849_enabled - and bundle_aggregations - and not event.is_state() - and not event.internal_metadata.is_redacted() - ): - await self._injected_bundled_aggregations(event, time_now, serialized_event) + if bundle_aggregations: + event_aggregations = bundle_aggregations.get(event.event_id) + if event_aggregations: + self._injected_bundled_aggregations( + event, + time_now, + bundle_aggregations[event.event_id], + serialized_event, + ) return serialized_event - async def _injected_bundled_aggregations( - self, event: EventBase, time_now: int, serialized_event: JsonDict + def _injected_bundled_aggregations( + self, + event: EventBase, + time_now: int, + aggregations: JsonDict, + serialized_event: JsonDict, ) -> None: """Potentially injects bundled aggregations into the unsigned portion of the serialized event. Args: event: The event being serialized. time_now: The current time in milliseconds + aggregations: The bundled aggregation to serialize. serialized_event: The serialized event which may be modified. """ - # Do not bundle aggregations for an event which represents an edit or an - # annotation. It does not make sense for them to have related events. - relates_to = event.content.get("m.relates_to") - if isinstance(relates_to, (dict, frozendict)): - relation_type = relates_to.get("rel_type") - if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): - return - - event_id = event.event_id - room_id = event.room_id - - # The bundled aggregations to include. - aggregations = {} - - annotations = await self.store.get_aggregation_groups_for_event( - event_id, room_id - ) - if annotations.chunk: - aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + # Make a copy in-case the object is cached. + aggregations = aggregations.copy() - references = await self.store.get_relations_for_event( - event_id, room_id, RelationTypes.REFERENCE, direction="f" - ) - if references.chunk: - aggregations[RelationTypes.REFERENCE] = references.to_dict() - - edit = None - if event.type == EventTypes.Message: - edit = await self.store.get_applicable_edit(event_id, room_id) - - if edit: + if RelationTypes.REPLACE in aggregations: # If there is an edit replace the content, preserving existing # relations. + edit = aggregations[RelationTypes.REPLACE] # Ensure we take copies of the edit content, otherwise we risk modifying # the original event. @@ -502,27 +458,19 @@ class EventClientSerializer: } # If this event is the start of a thread, include a summary of the replies. - if self._msc3440_enabled: - ( - thread_count, - latest_thread_event, - ) = await self.store.get_thread_summary(event_id, room_id) - if latest_thread_event: - aggregations[RelationTypes.THREAD] = { - # Don't bundle aggregations as this could recurse forever. - "latest_event": await self.serialize_event( - latest_thread_event, time_now, bundle_aggregations=False - ), - "count": thread_count, - } - - # If any bundled aggregations were found, include them. - if aggregations: - serialized_event["unsigned"].setdefault("m.relations", {}).update( - aggregations + if RelationTypes.THREAD in aggregations: + # Serialize the latest thread event. + latest_thread_event = aggregations[RelationTypes.THREAD]["latest_event"] + + # Don't bundle aggregations as this could recurse forever. + aggregations[RelationTypes.THREAD]["latest_event"] = self.serialize_event( + latest_thread_event, time_now, bundle_aggregations=None ) - async def serialize_events( + # Include the bundled aggregations in the event. + serialized_event["unsigned"].setdefault("m.relations", {}).update(aggregations) + + def serialize_events( self, events: Iterable[Union[JsonDict, EventBase]], time_now: int, **kwargs: Any ) -> List[JsonDict]: """Serializes multiple events. @@ -535,9 +483,9 @@ class EventClientSerializer: Returns: The list of serialized events """ - return await yieldable_gather_results( - self.serialize_event, events, time_now=time_now, **kwargs - ) + return [ + self.serialize_event(event, time_now=time_now, **kwargs) for event in events + ] def copy_power_levels_contents( diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index addc0bf000..896168c05c 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -230,6 +230,10 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB # origin, etc etc) assert_params_in_dict(pdu_json, ("type", "depth")) + # Strip any unauthorized values from "unsigned" if they exist + if "unsigned" in pdu_json: + _strip_unsigned_values(pdu_json) + depth = pdu_json["depth"] if not isinstance(depth, int): raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON) @@ -245,3 +249,24 @@ def event_from_pdu_json(pdu_json: JsonDict, room_version: RoomVersion) -> EventB event = make_event_from_dict(pdu_json, room_version) return event + + +def _strip_unsigned_values(pdu_dict: JsonDict) -> None: + """ + Strip any unsigned values unless specifically allowed, as defined by the whitelist. + + pdu: the json dict to strip values from. Note that the dict is mutated by this + function + """ + unsigned = pdu_dict["unsigned"] + + if not isinstance(unsigned, dict): + pdu_dict["unsigned"] = {} + + if pdu_dict["type"] == "m.room.member": + whitelist = ["knock_room_state", "invite_room_state", "age"] + else: + whitelist = ["age"] + + filtered_unsigned = {k: v for k, v in unsigned.items() if k in whitelist} + pdu_dict["unsigned"] = filtered_unsigned diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 391b30fbb5..8152e80b88 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -607,18 +607,18 @@ class PerDestinationQueue: self._pending_pdus = [] -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _TransactionQueueManager: """A helper async context manager for pulling stuff off the queues and tracking what was last successfully sent, etc. """ - queue = attr.ib(type=PerDestinationQueue) + queue: PerDestinationQueue - _device_stream_id = attr.ib(type=Optional[int], default=None) - _device_list_id = attr.ib(type=Optional[int], default=None) - _last_stream_ordering = attr.ib(type=Optional[int], default=None) - _pdus = attr.ib(type=List[EventBase], factory=list) + _device_stream_id: Optional[int] = None + _device_list_id: Optional[int] = None + _last_stream_ordering: Optional[int] = None + _pdus: List[EventBase] = attr.Factory(list) async def __aenter__(self) -> Tuple[List[EventBase], List[Edu]]: # First we calculate the EDUs we want to send, if any. diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py index 96273e2f81..bad48713bc 100644 --- a/synapse/handlers/account_data.py +++ b/synapse/handlers/account_data.py @@ -77,7 +77,7 @@ class AccountDataHandler: async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: - """Add some account_data to a room for a user. + """Add some global account_data for a user. Args: user_id: The user to add a tag for. diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py index 85157a138b..00ab5e79bf 100644 --- a/synapse/handlers/admin.py +++ b/synapse/handlers/admin.py @@ -55,21 +55,47 @@ class AdminHandler: async def get_user(self, user: UserID) -> Optional[JsonDict]: """Function to get user details""" - ret = await self.store.get_user_by_id(user.to_string()) - if ret: - profile = await self.store.get_profileinfo(user.localpart) - threepids = await self.store.user_get_threepids(user.to_string()) - external_ids = [ - ({"auth_provider": auth_provider, "external_id": external_id}) - for auth_provider, external_id in await self.store.get_external_ids_by_user( - user.to_string() - ) - ] - ret["displayname"] = profile.display_name - ret["avatar_url"] = profile.avatar_url - ret["threepids"] = threepids - ret["external_ids"] = external_ids - return ret + user_info_dict = await self.store.get_user_by_id(user.to_string()) + if user_info_dict is None: + return None + + # Restrict returned information to a known set of fields. This prevents additional + # fields added to get_user_by_id from modifying Synapse's external API surface. + user_info_to_return = { + "name", + "admin", + "deactivated", + "shadow_banned", + "creation_ts", + "appservice_id", + "consent_server_notice_sent", + "consent_version", + "user_type", + "is_guest", + } + + # Restrict returned keys to a known set. + user_info_dict = { + key: value + for key, value in user_info_dict.items() + if key in user_info_to_return + } + + # Add additional user metadata + profile = await self.store.get_profileinfo(user.localpart) + threepids = await self.store.user_get_threepids(user.to_string()) + external_ids = [ + ({"auth_provider": auth_provider, "external_id": external_id}) + for auth_provider, external_id in await self.store.get_external_ids_by_user( + user.to_string() + ) + ] + user_info_dict["displayname"] = profile.display_name + user_info_dict["avatar_url"] = profile.avatar_url + user_info_dict["threepids"] = threepids + user_info_dict["external_ids"] = external_ids + + return user_info_dict async def export_user_data(self, user_id: str, writer: "ExfiltrationWriter") -> Any: """Write all data we have on the user to the given writer. diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index 84724b207c..2389c9ac52 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -168,25 +168,25 @@ def login_id_phone_to_thirdparty(identifier: JsonDict) -> Dict[str, str]: } -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class SsoLoginExtraAttributes: """Data we track about SAML2 sessions""" # time the session was created, in milliseconds - creation_time = attr.ib(type=int) - extra_attributes = attr.ib(type=JsonDict) + creation_time: int + extra_attributes: JsonDict -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class LoginTokenAttributes: """Data we store in a short-term login token""" - user_id = attr.ib(type=str) + user_id: str - auth_provider_id = attr.ib(type=str) + auth_provider_id: str """The SSO Identity Provider that the user authenticated with, to get this token.""" - auth_provider_session_id = attr.ib(type=Optional[str]) + auth_provider_session_id: Optional[str] """The session ID advertised by the SSO Identity Provider.""" diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 7665425232..b184a48cb1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -948,8 +948,16 @@ class DeviceListUpdater: devices = [] ignore_devices = True else: + prev_stream_id = await self.store.get_device_list_last_stream_id_for_remote( + user_id + ) cached_devices = await self.store.get_cached_devices_for_user(user_id) - if cached_devices == {d["device_id"]: d for d in devices}: + + # To ensure that a user with no devices is cached, we skip the resync only + # if we have a stream_id from previously writing a cache entry. + if prev_stream_id is not None and cached_devices == { + d["device_id"]: d for d in devices + }: logging.info( "Skipping device list resync for %s, as our cache matches already", user_id, diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 14360b4e40..d4dfddf63f 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -1321,14 +1321,14 @@ def _one_time_keys_match(old_key_json: str, new_key: JsonDict) -> bool: return old_key == new_key_copy -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class SignatureListItem: """An item in the signature list as used by upload_signatures_for_device_keys.""" - signing_key_id = attr.ib(type=str) - target_user_id = attr.ib(type=str) - target_device_id = attr.ib(type=str) - signature = attr.ib(type=JsonDict) + signing_key_id: str + target_user_id: str + target_device_id: str + signature: JsonDict class SigningKeyEduUpdater: diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 1b996c420d..a3add8a586 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -119,7 +119,7 @@ class EventStreamHandler: events.extend(to_add) - chunks = await self._event_serializer.serialize_events( + chunks = self._event_serializer.serialize_events( events, time_now, as_client_event=as_client_event, diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index 601bab67f9..346a06ff49 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -170,7 +170,7 @@ class InitialSyncHandler: d["inviter"] = event.sender invite_event = await self.store.get_event(event.event_id) - d["invite"] = await self._event_serializer.serialize_event( + d["invite"] = self._event_serializer.serialize_event( invite_event, time_now, as_client_event=as_client_event, @@ -222,7 +222,7 @@ class InitialSyncHandler: d["messages"] = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( messages, time_now=time_now, as_client_event=as_client_event, @@ -232,7 +232,7 @@ class InitialSyncHandler: "end": await end_token.to_string(self.store), } - d["state"] = await self._event_serializer.serialize_events( + d["state"] = self._event_serializer.serialize_events( current_state.values(), time_now=time_now, as_client_event=as_client_event, @@ -376,16 +376,14 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), }, "state": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events( - room_state.values(), time_now - ) + self._event_serializer.serialize_events(room_state.values(), time_now) ), "presence": [], "receipts": [], @@ -404,7 +402,7 @@ class InitialSyncHandler: # TODO: These concurrently time_now = self.clock.time_msec() # Don't bundle aggregations as this is a deprecated API. - state = await self._event_serializer.serialize_events( + state = self._event_serializer.serialize_events( current_state.values(), time_now ) @@ -480,7 +478,7 @@ class InitialSyncHandler: "messages": { "chunk": ( # Don't bundle aggregations as this is a deprecated API. - await self._event_serializer.serialize_events(messages, time_now) + self._event_serializer.serialize_events(messages, time_now) ), "start": await start_token.to_string(self.store), "end": await end_token.to_string(self.store), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 5e3d3886eb..b37250aa38 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -246,7 +246,7 @@ class MessageHandler: room_state = room_state_events[membership_event_id] now = self.clock.time_msec() - events = await self._event_serializer.serialize_events(room_state.values(), now) + events = self._event_serializer.serialize_events(room_state.values(), now) return events async def get_joined_members(self, requester: Requester, room_id: str) -> dict: diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 7469cc55a2..472688f045 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -537,14 +537,16 @@ class PaginationHandler: state_dict = await self.store.get_events(list(state_ids.values())) state = state_dict.values() + aggregations = await self.store.get_bundled_aggregations(events) + time_now = self.clock.time_msec() chunk = { "chunk": ( - await self._event_serializer.serialize_events( + self._event_serializer.serialize_events( events, time_now, - bundle_aggregations=True, + bundle_aggregations=aggregations, as_client_event=as_client_event, ) ), @@ -553,7 +555,7 @@ class PaginationHandler: } if state: - chunk["state"] = await self._event_serializer.serialize_events( + chunk["state"] = self._event_serializer.serialize_events( state, time_now, as_client_event=as_client_event ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index b9c1cbffa5..3d47163f25 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -393,7 +393,9 @@ class RoomCreationHandler: user_id = requester.user.to_string() if not await self.spam_checker.user_may_create_room(user_id): - raise SynapseError(403, "You are not permitted to create rooms") + raise SynapseError( + 403, "You are not permitted to create rooms", Codes.FORBIDDEN + ) creation_content: JsonDict = { "room_version": new_room_version.identifier, @@ -685,7 +687,9 @@ class RoomCreationHandler: invite_3pid_list, ) ): - raise SynapseError(403, "You are not permitted to create rooms") + raise SynapseError( + 403, "You are not permitted to create rooms", Codes.FORBIDDEN + ) if ratelimit: await self.request_ratelimiter.ratelimit(requester) @@ -1177,6 +1181,16 @@ class RoomContextHandler: # `filtered` rather than the event we retrieved from the datastore. results["event"] = filtered[0] + # Fetch the aggregations. + aggregations = await self.store.get_bundled_aggregations([results["event"]]) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_before"]) + ) + aggregations.update( + await self.store.get_bundled_aggregations(results["events_after"]) + ) + results["aggregations"] = aggregations + if results["events_after"]: last_event_id = results["events_after"][-1].event_id else: diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index b2cfe537df..7c60cb0bdd 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -153,6 +153,9 @@ class RoomSummaryHandler: rooms_result: List[JsonDict] = [] events_result: List[JsonDict] = [] + if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE: + max_rooms_per_space = MAX_ROOMS_PER_SPACE + while room_queue and len(rooms_result) < MAX_ROOMS: queue_entry = room_queue.popleft() room_id = queue_entry.room_id @@ -167,7 +170,7 @@ class RoomSummaryHandler: # The client-specified max_rooms_per_space limit doesn't apply to the # room_id specified in the request, so we ignore it if this is the # first room we are processing. - max_children = max_rooms_per_space if processed_rooms else None + max_children = max_rooms_per_space if processed_rooms else MAX_ROOMS if is_in_room: room_entry = await self._summarize_local_room( @@ -209,7 +212,7 @@ class RoomSummaryHandler: # Before returning to the client, remove the allowed_room_ids # and allowed_spaces keys. room.pop("allowed_room_ids", None) - room.pop("allowed_spaces", None) + room.pop("allowed_spaces", None) # historical rooms_result.append(room) events.extend(room_entry.children_state_events) @@ -395,7 +398,7 @@ class RoomSummaryHandler: None, room_id, suggested_only, - # TODO Handle max children. + # Do not limit the maximum children. max_children=None, ) @@ -525,6 +528,10 @@ class RoomSummaryHandler: rooms_result: List[JsonDict] = [] events_result: List[JsonDict] = [] + # Set a limit on the number of rooms to return. + if max_rooms_per_space is None or max_rooms_per_space > MAX_ROOMS_PER_SPACE: + max_rooms_per_space = MAX_ROOMS_PER_SPACE + while room_queue and len(rooms_result) < MAX_ROOMS: room_id = room_queue.popleft() if room_id in processed_rooms: @@ -583,7 +590,9 @@ class RoomSummaryHandler: # Iterate through each child and potentially add it, but not its children, # to the response. - for child_room in root_room_entry.children_state_events: + for child_room in itertools.islice( + root_room_entry.children_state_events, MAX_ROOMS_PER_SPACE + ): room_id = child_room.get("state_key") assert isinstance(room_id, str) # If the room is unknown, skip it. @@ -633,8 +642,8 @@ class RoomSummaryHandler: suggested_only: True if only suggested children should be returned. Otherwise, all children are returned. max_children: - The maximum number of children rooms to include. This is capped - to a server-set limit. + The maximum number of children rooms to include. A value of None + means no limit. Returns: A room entry if the room should be returned. None, otherwise. @@ -656,8 +665,13 @@ class RoomSummaryHandler: # we only care about suggested children child_events = filter(_is_suggested_child_event, child_events) - if max_children is None or max_children > MAX_ROOMS_PER_SPACE: - max_children = MAX_ROOMS_PER_SPACE + # TODO max_children is legacy code for the /spaces endpoint. + if max_children is not None: + child_iter: Iterable[EventBase] = itertools.islice( + child_events, max_children + ) + else: + child_iter = child_events stripped_events: List[JsonDict] = [ { @@ -668,7 +682,7 @@ class RoomSummaryHandler: "sender": e.sender, "origin_server_ts": e.origin_server_ts, } - for e in itertools.islice(child_events, max_children) + for e in child_iter ] return _RoomEntry(room_id, room_entry, stripped_events) @@ -988,12 +1002,14 @@ class RoomSummaryHandler: "canonical_alias": stats["canonical_alias"], "num_joined_members": stats["joined_members"], "avatar_url": stats["avatar"], + # plural join_rules is a documentation error but kept for historical + # purposes. Should match /publicRooms. "join_rules": stats["join_rules"], + "join_rule": stats["join_rules"], "world_readable": ( stats["history_visibility"] == HistoryVisibility.WORLD_READABLE ), "guest_can_join": stats["guest_access"] == "can_join", - "creation_ts": create_event.origin_server_ts, "room_type": create_event.content.get(EventContentFields.ROOM_TYPE), } diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py index ab7eaab2fb..0b153a6822 100644 --- a/synapse/handlers/search.py +++ b/synapse/handlers/search.py @@ -420,10 +420,10 @@ class SearchHandler: time_now = self.clock.time_msec() for context in contexts.values(): - context["events_before"] = await self._event_serializer.serialize_events( + context["events_before"] = self._event_serializer.serialize_events( context["events_before"], time_now ) - context["events_after"] = await self._event_serializer.serialize_events( + context["events_after"] = self._event_serializer.serialize_events( context["events_after"], time_now ) @@ -441,9 +441,7 @@ class SearchHandler: results.append( { "rank": rank_map[e.event_id], - "result": ( - await self._event_serializer.serialize_event(e, time_now) - ), + "result": self._event_serializer.serialize_event(e, time_now), "context": contexts.get(e.event_id, {}), } ) @@ -457,7 +455,7 @@ class SearchHandler: if state_results: s = {} for room_id, state_events in state_results.items(): - s[room_id] = await self._event_serializer.serialize_events( + s[room_id] = self._event_serializer.serialize_events( state_events, time_now ) diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 65c27bc64a..0bb8b0929e 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -126,45 +126,45 @@ class SsoIdentityProvider(Protocol): raise NotImplementedError() -@attr.s +@attr.s(auto_attribs=True) class UserAttributes: # the localpart of the mxid that the mapper has assigned to the user. # if `None`, the mapper has not picked a userid, and the user should be prompted to # enter one. - localpart = attr.ib(type=Optional[str]) - display_name = attr.ib(type=Optional[str], default=None) - emails = attr.ib(type=Collection[str], default=attr.Factory(list)) + localpart: Optional[str] + display_name: Optional[str] = None + emails: Collection[str] = attr.Factory(list) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class UsernameMappingSession: """Data we track about SSO sessions""" # A unique identifier for this SSO provider, e.g. "oidc" or "saml". - auth_provider_id = attr.ib(type=str) + auth_provider_id: str # user ID on the IdP server - remote_user_id = attr.ib(type=str) + remote_user_id: str # attributes returned by the ID mapper - display_name = attr.ib(type=Optional[str]) - emails = attr.ib(type=Collection[str]) + display_name: Optional[str] + emails: Collection[str] # An optional dictionary of extra attributes to be provided to the client in the # login response. - extra_login_attributes = attr.ib(type=Optional[JsonDict]) + extra_login_attributes: Optional[JsonDict] # where to redirect the client back to - client_redirect_url = attr.ib(type=str) + client_redirect_url: str # expiry time for the session, in milliseconds - expiry_time_ms = attr.ib(type=int) + expiry_time_ms: int # choices made by the user - chosen_localpart = attr.ib(type=Optional[str], default=None) - use_display_name = attr.ib(type=bool, default=True) - emails_to_use = attr.ib(type=Collection[str], default=()) - terms_accepted_version = attr.ib(type=Optional[str], default=None) + chosen_localpart: Optional[str] = None + use_display_name: bool = True + emails_to_use: Collection[str] = () + terms_accepted_version: Optional[str] = None # the HTTP cookie used to track the mapping session id diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 7baf3f199c..e1df9b3106 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -60,10 +60,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -# Debug logger for https://github.com/matrix-org/synapse/issues/4422 -issue4422_logger = logging.getLogger("synapse.handler.sync.4422_debug") - - # Counts the number of times we returned a non-empty sync. `type` is one of # "initial_sync", "full_state_sync" or "incremental_sync", `lazy_loaded` is # "true" or "false" depending on if the request asked for lazy loaded members or @@ -102,6 +98,9 @@ class TimelineBatch: prev_batch: StreamToken events: List[EventBase] limited: bool + # A mapping of event ID to the bundled aggregations for the above events. + # This is only calculated if limited is true. + bundled_aggregations: Optional[Dict[str, Dict[str, Any]]] = None def __bool__(self) -> bool: """Make the result appear empty if there are no updates. This is used @@ -634,10 +633,17 @@ class SyncHandler: prev_batch_token = now_token.copy_and_replace("room_key", room_key) + # Don't bother to bundle aggregations if the timeline is unlimited, + # as clients will have all the necessary information. + bundled_aggregations = None + if limited or newly_joined_room: + bundled_aggregations = await self.store.get_bundled_aggregations(recents) + return TimelineBatch( events=recents, prev_batch=prev_batch_token, limited=limited or newly_joined_room, + bundled_aggregations=bundled_aggregations, ) async def get_state_after_event( @@ -1161,13 +1167,8 @@ class SyncHandler: num_events = 0 - # debug for https://github.com/matrix-org/synapse/issues/4422 + # debug for https://github.com/matrix-org/synapse/issues/9424 for joined_room in sync_result_builder.joined: - room_id = joined_room.room_id - if room_id in newly_joined_rooms: - issue4422_logger.debug( - "Sync result for newly joined room %s: %r", room_id, joined_room - ) num_events += len(joined_room.timeline.events) log_kv( @@ -1740,18 +1741,6 @@ class SyncHandler: old_mem_ev_id, allow_none=True ) - # debug for #4422 - if has_join: - prev_membership = None - if old_mem_ev: - prev_membership = old_mem_ev.membership - issue4422_logger.debug( - "Previous membership for room %s with join: %s (event %s)", - room_id, - prev_membership, - old_mem_ev_id, - ) - if not old_mem_ev or old_mem_ev.membership != Membership.JOIN: newly_joined_rooms.append(room_id) @@ -1893,13 +1882,6 @@ class SyncHandler: upto_token=since_token, ) - if newly_joined: - # debugging for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "RoomSyncResultBuilder events for newly joined room %s: %r", - room_id, - entry.events, - ) room_entries.append(entry) return _RoomChanges( @@ -2077,14 +2059,6 @@ class SyncHandler: # `_load_filtered_recents` can't find any events the user should see # (e.g. due to having ignored the sender of the last 50 events). - if newly_joined: - # debug for https://github.com/matrix-org/synapse/issues/4422 - issue4422_logger.debug( - "Timeline events after filtering in newly-joined room %s: %r", - room_id, - batch, - ) - # When we join the room (or the client requests full_state), we should # send down any existing tags. Usually the user won't have tags in a # newly joined room, unless either a) they've joined before or b) the diff --git a/synapse/http/connectproxyclient.py b/synapse/http/connectproxyclient.py index fbafffd69b..203e995bb7 100644 --- a/synapse/http/connectproxyclient.py +++ b/synapse/http/connectproxyclient.py @@ -32,9 +32,9 @@ class ProxyConnectError(ConnectError): pass -@attr.s +@attr.s(auto_attribs=True) class ProxyCredentials: - username_password = attr.ib(type=bytes) + username_password: bytes def as_proxy_authorization_value(self) -> bytes: """ diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index deedde0b5b..2e668363b2 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -123,37 +123,37 @@ class ByteParser(ByteWriteable, Generic[T], abc.ABC): pass -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class MatrixFederationRequest: - method = attr.ib(type=str) + method: str """HTTP method """ - path = attr.ib(type=str) + path: str """HTTP path """ - destination = attr.ib(type=str) + destination: str """The remote server to send the HTTP request to. """ - json = attr.ib(default=None, type=Optional[JsonDict]) + json: Optional[JsonDict] = None """JSON to send in the body. """ - json_callback = attr.ib(default=None, type=Optional[Callable[[], JsonDict]]) + json_callback: Optional[Callable[[], JsonDict]] = None """A callback to generate the JSON. """ - query = attr.ib(default=None, type=Optional[dict]) + query: Optional[dict] = None """Query arguments. """ - txn_id = attr.ib(default=None, type=Optional[str]) + txn_id: Optional[str] = None """Unique ID for this request (for logging) """ - uri = attr.ib(init=False, type=bytes) + uri: bytes = attr.ib(init=False) """The URI of this request """ diff --git a/synapse/http/site.py b/synapse/http/site.py index 80f7a2ff58..c180a1d323 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -534,9 +534,9 @@ class XForwardedForRequest(SynapseRequest): @implementer(IAddress) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class _XForwardedForAddress: - host = attr.ib(type=str) + host: str class SynapseSite(Site): diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 8202d0494d..475756f1db 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -39,7 +39,7 @@ from twisted.python.failure import Failure logger = logging.getLogger(__name__) -@attr.s +@attr.s(slots=True, auto_attribs=True) @implementer(IPushProducer) class LogProducer: """ @@ -54,10 +54,10 @@ class LogProducer: # This is essentially ITCPTransport, but that is missing certain fields # (connected and registerProducer) which are part of the implementation. - transport = attr.ib(type=Connection) - _format = attr.ib(type=Callable[[logging.LogRecord], str]) - _buffer = attr.ib(type=deque) - _paused = attr.ib(default=False, type=bool, init=False) + transport: Connection + _format: Callable[[logging.LogRecord], str] + _buffer: Deque[logging.LogRecord] + _paused: bool = attr.ib(default=False, init=False) def pauseProducing(self): self._paused = True diff --git a/synapse/logging/context.py b/synapse/logging/context.py index d4ee893376..c31c2960ad 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -193,7 +193,7 @@ class ContextResourceUsage: return res -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ContextRequest: """ A bundle of attributes from the SynapseRequest object. @@ -205,15 +205,15 @@ class ContextRequest: their children. """ - request_id = attr.ib(type=str) - ip_address = attr.ib(type=str) - site_tag = attr.ib(type=str) - requester = attr.ib(type=Optional[str]) - authenticated_entity = attr.ib(type=Optional[str]) - method = attr.ib(type=str) - url = attr.ib(type=str) - protocol = attr.ib(type=str) - user_agent = attr.ib(type=str) + request_id: str + ip_address: str + site_tag: str + requester: Optional[str] + authenticated_entity: Optional[str] + method: str + url: str + protocol: str + user_agent: str LoggingContextOrSentinel = Union["LoggingContext", "_Sentinel"] diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 622445e9f4..5672d60de3 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -251,7 +251,7 @@ try: class _WrappedRustReporter(BaseReporter): """Wrap the reporter to ensure `report_span` never throws.""" - _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter)) + _reporter: Reporter = attr.Factory(Reporter) def set_process(self, *args, **kwargs): return self._reporter.set_process(*args, **kwargs) diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index ceef57ad88..ba7ca0f2d4 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -13,7 +13,6 @@ # limitations under the License. import functools -import gc import itertools import logging import os @@ -41,7 +40,6 @@ import attr from prometheus_client import CollectorRegistry, Counter, Gauge, Histogram, Metric from prometheus_client.core import ( REGISTRY, - CounterMetricFamily, GaugeHistogramMetricFamily, GaugeMetricFamily, ) @@ -56,13 +54,13 @@ from synapse.metrics._exposition import ( generate_latest, start_http_server, ) +from synapse.metrics._gc import MIN_TIME_BETWEEN_GCS, install_gc_manager from synapse.util.versionstring import get_version_string logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" -running_on_pypy = platform.python_implementation() == "PyPy" all_gauges: "Dict[str, Union[LaterGauge, InFlightGauge]]" = {} HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") @@ -76,19 +74,17 @@ class RegistryProxy: yield metric -@attr.s(slots=True, hash=True) +@attr.s(slots=True, hash=True, auto_attribs=True) class LaterGauge: - name = attr.ib(type=str) - desc = attr.ib(type=str) - labels = attr.ib(hash=False, type=Optional[Iterable[str]]) + name: str + desc: str + labels: Optional[Iterable[str]] = attr.ib(hash=False) # callback: should either return a value (if there are no labels for this metric), # or dict mapping from a label tuple to a value - caller = attr.ib( - type=Callable[ - [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] - ] - ) + caller: Callable[ + [], Union[Mapping[Tuple[str, ...], Union[int, float]], Union[int, float]] + ] def collect(self) -> Iterable[Metric]: @@ -157,7 +153,9 @@ class InFlightGauge(Generic[MetricsEntry]): # Create a class which have the sub_metrics values as attributes, which # default to 0 on initialization. Used to pass to registered callbacks. self._metrics_class: Type[MetricsEntry] = attr.make_class( - "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True + "_MetricsEntry", + attrs={x: attr.ib(default=0) for x in sub_metrics}, + slots=True, ) # Counts number of in flight blocks for a given set of label values @@ -369,121 +367,6 @@ class CPUMetrics: REGISTRY.register(CPUMetrics()) -# -# Python GC metrics -# - -gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) -gc_time = Histogram( - "python_gc_time", - "Time taken to GC (sec)", - ["gen"], - buckets=[ - 0.0025, - 0.005, - 0.01, - 0.025, - 0.05, - 0.10, - 0.25, - 0.50, - 1.00, - 2.50, - 5.00, - 7.50, - 15.00, - 30.00, - 45.00, - 60.00, - ], -) - - -class GCCounts: - def collect(self) -> Iterable[Metric]: - cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) - for n, m in enumerate(gc.get_count()): - cm.add_metric([str(n)], m) - - yield cm - - -if not running_on_pypy: - REGISTRY.register(GCCounts()) - - -# -# PyPy GC / memory metrics -# - - -class PyPyGCStats: - def collect(self) -> Iterable[Metric]: - - # @stats is a pretty-printer object with __str__() returning a nice table, - # plus some fields that contain data from that table. - # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). - stats = gc.get_stats(memory_pressure=False) # type: ignore - # @s contains same fields as @stats, but as actual integers. - s = stats._s # type: ignore - - # also note that field naming is completely braindead - # and only vaguely correlates with the pretty-printed table. - # >>>> gc.get_stats(False) - # Total memory consumed: - # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory - # in arenas: 3.0MB # s.total_arena_memory - # rawmalloced: 1.7MB # s.total_rawmalloced_memory - # nursery: 4.0MB # s.nursery_size - # raw assembler used: 31.0kB # s.jit_backend_used - # ----------------------------- - # Total: 8.8MB # stats.memory_used_sum - # - # Total memory allocated: - # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory - # in arenas: 30.9MB # s.peak_arena_memory - # rawmalloced: 4.1MB # s.peak_rawmalloced_memory - # nursery: 4.0MB # s.nursery_size - # raw assembler allocated: 1.0MB # s.jit_backend_allocated - # ----------------------------- - # Total: 39.7MB # stats.memory_allocated_sum - # - # Total time spent in GC: 0.073 # s.total_gc_time - - pypy_gc_time = CounterMetricFamily( - "pypy_gc_time_seconds_total", - "Total time spent in PyPy GC", - labels=[], - ) - pypy_gc_time.add_metric([], s.total_gc_time / 1000) - yield pypy_gc_time - - pypy_mem = GaugeMetricFamily( - "pypy_memory_bytes", - "Memory tracked by PyPy allocator", - labels=["state", "class", "kind"], - ) - # memory used by JIT assembler - pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) - pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) - # memory used by GCed objects - pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) - pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) - pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) - pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) - pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) - pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) - # totals - pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) - pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) - pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) - pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) - yield pypy_mem - - -if running_on_pypy: - REGISTRY.register(PyPyGCStats()) - # # Twisted reactor metrics @@ -612,14 +495,6 @@ class ReactorLastSeenMetric: REGISTRY.register(ReactorLastSeenMetric()) -# The minimum time in seconds between GCs for each generation, regardless of the current GC -# thresholds and counts. -MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) - -# The time (in seconds since the epoch) of the last time we did a GC for each generation. -_last_gc = [0.0, 0.0, 0.0] - - F = TypeVar("F", bound=Callable[..., Any]) @@ -658,34 +533,6 @@ def runUntilCurrentTimer(reactor: ReactorBase, func: F) -> F: global last_ticked last_ticked = end - if running_on_pypy: - return ret - - # Check if we need to do a manual GC (since its been disabled), and do - # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may - # promote an object into gen 2, and we don't want to handle the same - # object multiple times. - threshold = gc.get_threshold() - counts = gc.get_count() - for i in (2, 1, 0): - # We check if we need to do one based on a straightforward - # comparison between the threshold and count. We also do an extra - # check to make sure that we don't a GC too often. - if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]: - if i == 0: - logger.debug("Collecting gc %d", i) - else: - logger.info("Collecting gc %d", i) - - start = time.time() - unreachable = gc.collect(i) - end = time.time() - - _last_gc[i] = end - - gc_time.labels(i).observe(end - start) - gc_unreachable.labels(i).set(unreachable) - return ret return cast(F, f) @@ -701,11 +548,6 @@ try: # runUntilCurrent is called when we have pending calls. It is called once # per iteratation after fd polling. reactor.runUntilCurrent = runUntilCurrentTimer(reactor, reactor.runUntilCurrent) # type: ignore - - # We manually run the GC each reactor tick so that we can get some metrics - # about time spent doing GC, - if not running_on_pypy: - gc.disable() except AttributeError: pass @@ -717,4 +559,6 @@ __all__ = [ "LaterGauge", "InFlightGauge", "GaugeBucketCollector", + "MIN_TIME_BETWEEN_GCS", + "install_gc_manager", ] diff --git a/synapse/metrics/_gc.py b/synapse/metrics/_gc.py new file mode 100644 index 0000000000..2bc909efa0 --- /dev/null +++ b/synapse/metrics/_gc.py @@ -0,0 +1,203 @@ +# Copyright 2015-2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import gc +import logging +import platform +import time +from typing import Iterable + +from prometheus_client.core import ( + REGISTRY, + CounterMetricFamily, + Gauge, + GaugeMetricFamily, + Histogram, + Metric, +) + +from twisted.internet import task + +"""Prometheus metrics for garbage collection""" + + +logger = logging.getLogger(__name__) + +# The minimum time in seconds between GCs for each generation, regardless of the current GC +# thresholds and counts. +MIN_TIME_BETWEEN_GCS = (1.0, 10.0, 30.0) + +running_on_pypy = platform.python_implementation() == "PyPy" + +# +# Python GC metrics +# + +gc_unreachable = Gauge("python_gc_unreachable_total", "Unreachable GC objects", ["gen"]) +gc_time = Histogram( + "python_gc_time", + "Time taken to GC (sec)", + ["gen"], + buckets=[ + 0.0025, + 0.005, + 0.01, + 0.025, + 0.05, + 0.10, + 0.25, + 0.50, + 1.00, + 2.50, + 5.00, + 7.50, + 15.00, + 30.00, + 45.00, + 60.00, + ], +) + + +class GCCounts: + def collect(self) -> Iterable[Metric]: + cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) + for n, m in enumerate(gc.get_count()): + cm.add_metric([str(n)], m) + + yield cm + + +def install_gc_manager() -> None: + """Disable automatic GC, and replace it with a task that runs every 100ms + + This means that (a) we can limit how often GC runs; (b) we can get some metrics + about GC activity. + + It does nothing on PyPy. + """ + + if running_on_pypy: + return + + REGISTRY.register(GCCounts()) + + gc.disable() + + # The time (in seconds since the epoch) of the last time we did a GC for each generation. + _last_gc = [0.0, 0.0, 0.0] + + def _maybe_gc() -> None: + # Check if we need to do a manual GC (since its been disabled), and do + # one if necessary. Note we go in reverse order as e.g. a gen 1 GC may + # promote an object into gen 2, and we don't want to handle the same + # object multiple times. + threshold = gc.get_threshold() + counts = gc.get_count() + end = time.time() + for i in (2, 1, 0): + # We check if we need to do one based on a straightforward + # comparison between the threshold and count. We also do an extra + # check to make sure that we don't a GC too often. + if threshold[i] < counts[i] and MIN_TIME_BETWEEN_GCS[i] < end - _last_gc[i]: + if i == 0: + logger.debug("Collecting gc %d", i) + else: + logger.info("Collecting gc %d", i) + + start = time.time() + unreachable = gc.collect(i) + end = time.time() + + _last_gc[i] = end + + gc_time.labels(i).observe(end - start) + gc_unreachable.labels(i).set(unreachable) + + gc_task = task.LoopingCall(_maybe_gc) + gc_task.start(0.1) + + +# +# PyPy GC / memory metrics +# + + +class PyPyGCStats: + def collect(self) -> Iterable[Metric]: + + # @stats is a pretty-printer object with __str__() returning a nice table, + # plus some fields that contain data from that table. + # unfortunately, fields are pretty-printed themselves (i. e. '4.5MB'). + stats = gc.get_stats(memory_pressure=False) # type: ignore + # @s contains same fields as @stats, but as actual integers. + s = stats._s # type: ignore + + # also note that field naming is completely braindead + # and only vaguely correlates with the pretty-printed table. + # >>>> gc.get_stats(False) + # Total memory consumed: + # GC used: 8.7MB (peak: 39.0MB) # s.total_gc_memory, s.peak_memory + # in arenas: 3.0MB # s.total_arena_memory + # rawmalloced: 1.7MB # s.total_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler used: 31.0kB # s.jit_backend_used + # ----------------------------- + # Total: 8.8MB # stats.memory_used_sum + # + # Total memory allocated: + # GC allocated: 38.7MB (peak: 41.1MB) # s.total_allocated_memory, s.peak_allocated_memory + # in arenas: 30.9MB # s.peak_arena_memory + # rawmalloced: 4.1MB # s.peak_rawmalloced_memory + # nursery: 4.0MB # s.nursery_size + # raw assembler allocated: 1.0MB # s.jit_backend_allocated + # ----------------------------- + # Total: 39.7MB # stats.memory_allocated_sum + # + # Total time spent in GC: 0.073 # s.total_gc_time + + pypy_gc_time = CounterMetricFamily( + "pypy_gc_time_seconds_total", + "Total time spent in PyPy GC", + labels=[], + ) + pypy_gc_time.add_metric([], s.total_gc_time / 1000) + yield pypy_gc_time + + pypy_mem = GaugeMetricFamily( + "pypy_memory_bytes", + "Memory tracked by PyPy allocator", + labels=["state", "class", "kind"], + ) + # memory used by JIT assembler + pypy_mem.add_metric(["used", "", "jit"], s.jit_backend_used) + pypy_mem.add_metric(["allocated", "", "jit"], s.jit_backend_allocated) + # memory used by GCed objects + pypy_mem.add_metric(["used", "", "arenas"], s.total_arena_memory) + pypy_mem.add_metric(["allocated", "", "arenas"], s.peak_arena_memory) + pypy_mem.add_metric(["used", "", "rawmalloced"], s.total_rawmalloced_memory) + pypy_mem.add_metric(["allocated", "", "rawmalloced"], s.peak_rawmalloced_memory) + pypy_mem.add_metric(["used", "", "nursery"], s.nursery_size) + pypy_mem.add_metric(["allocated", "", "nursery"], s.nursery_size) + # totals + pypy_mem.add_metric(["used", "totals", "gc"], s.total_gc_memory) + pypy_mem.add_metric(["allocated", "totals", "gc"], s.total_allocated_memory) + pypy_mem.add_metric(["used", "totals", "gc_peak"], s.peak_memory) + pypy_mem.add_metric(["allocated", "totals", "gc_peak"], s.peak_allocated_memory) + yield pypy_mem + + +if running_on_pypy: + REGISTRY.register(PyPyGCStats()) diff --git a/synapse/notifier.py b/synapse/notifier.py index bbabdb0587..41fd94d772 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -193,15 +193,15 @@ class EventStreamResult: return bool(self.events) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _PendingRoomEventEntry: - event_pos = attr.ib(type=PersistedEventPosition) - extra_users = attr.ib(type=Collection[UserID]) + event_pos: PersistedEventPosition + extra_users: Collection[UserID] - room_id = attr.ib(type=str) - type = attr.ib(type=str) - state_key = attr.ib(type=Optional[str]) - membership = attr.ib(type=Optional[str]) + room_id: str + type: str + state_key: Optional[str] + membership: Optional[str] class Notifier: diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 820f6f3f7e..5176a1c186 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -23,25 +23,25 @@ if TYPE_CHECKING: from synapse.server import HomeServer -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PusherConfig: """Parameters necessary to configure a pusher.""" - id = attr.ib(type=Optional[str]) - user_name = attr.ib(type=str) - access_token = attr.ib(type=Optional[int]) - profile_tag = attr.ib(type=str) - kind = attr.ib(type=str) - app_id = attr.ib(type=str) - app_display_name = attr.ib(type=str) - device_display_name = attr.ib(type=str) - pushkey = attr.ib(type=str) - ts = attr.ib(type=int) - lang = attr.ib(type=Optional[str]) - data = attr.ib(type=Optional[JsonDict]) - last_stream_ordering = attr.ib(type=int) - last_success = attr.ib(type=Optional[int]) - failing_since = attr.ib(type=Optional[int]) + id: Optional[str] + user_name: str + access_token: Optional[int] + profile_tag: str + kind: str + app_id: str + app_display_name: str + device_display_name: str + pushkey: str + ts: int + lang: Optional[str] + data: Optional[JsonDict] + last_stream_ordering: int + last_success: Optional[int] + failing_since: Optional[int] def as_dict(self) -> Dict[str, Any]: """Information that can be retrieved about a pusher after creation.""" @@ -57,12 +57,12 @@ class PusherConfig: } -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ThrottleParams: """Parameters for controlling the rate of sending pushes via email.""" - last_sent_ts = attr.ib(type=int) - throttle_ms = attr.ib(type=int) + last_sent_ts: int + throttle_ms: int class Pusher(metaclass=abc.ABCMeta): diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 009d8e77b0..bee660893b 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -298,7 +298,7 @@ RulesByUser = Dict[str, List[Rule]] StateGroup = Union[object, int] -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class RulesForRoomData: """The data stored in the cache by `RulesForRoom`. @@ -307,29 +307,29 @@ class RulesForRoomData: """ # event_id -> (user_id, state) - member_map = attr.ib(type=MemberMap, factory=dict) + member_map: MemberMap = attr.Factory(dict) # user_id -> rules - rules_by_user = attr.ib(type=RulesByUser, factory=dict) + rules_by_user: RulesByUser = attr.Factory(dict) # The last state group we updated the caches for. If the state_group of # a new event comes along, we know that we can just return the cached # result. # On invalidation of the rules themselves (if the user changes them), # we invalidate everything and set state_group to `object()` - state_group = attr.ib(type=StateGroup, factory=object) + state_group: StateGroup = attr.Factory(object) # A sequence number to keep track of when we're allowed to update the # cache. We bump the sequence number when we invalidate the cache. If # the sequence number changes while we're calculating stuff we should # not update the cache with it. - sequence = attr.ib(type=int, default=0) + sequence: int = 0 # A cache of user_ids that we *know* aren't interesting, e.g. user_ids # owned by AS's, or remote users, etc. (I.e. users we will never need to # calculate push for) # These never need to be invalidated as we will never set up push for # them. - uninteresting_user_set = attr.ib(type=Set[str], factory=set) + uninteresting_user_set: Set[str] = attr.Factory(set) class RulesForRoom: @@ -553,7 +553,7 @@ class RulesForRoom: self.data.state_group = state_group -@attr.attrs(slots=True, frozen=True) +@attr.attrs(slots=True, frozen=True, auto_attribs=True) class _Invalidation: # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules, # which means that it it is stored on the bulk_get_push_rules cache entry. In order @@ -564,8 +564,8 @@ class _Invalidation: # attrs provides suitable __hash__ and __eq__ methods, provided we remember to # set `frozen=True`. - cache = attr.ib(type=LruCache) - room_id = attr.ib(type=str) + cache: LruCache + room_id: str def __call__(self) -> None: rules_data = self.cache.get(self.room_id, None, update_metrics=False) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index ff904c2b4a..dadfc57413 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -178,7 +178,7 @@ class Mailer: await self.send_email( email_address, self.email_subjects.email_validation - % {"server_name": self.hs.config.server.server_name}, + % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, template_vars, ) @@ -209,7 +209,7 @@ class Mailer: await self.send_email( email_address, self.email_subjects.email_validation - % {"server_name": self.hs.config.server.server_name}, + % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, template_vars, ) diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py index a390cfcb74..4f4f1ad453 100644 --- a/synapse/replication/tcp/streams/events.py +++ b/synapse/replication/tcp/streams/events.py @@ -50,12 +50,12 @@ data part are: """ -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamRow: """A parsed row from the events replication stream""" - type = attr.ib() # str: the TypeId of one of the *EventsStreamRows - data = attr.ib() # BaseEventsStreamRow + type: str # the TypeId of one of the *EventsStreamRows + data: "BaseEventsStreamRow" class BaseEventsStreamRow: @@ -79,28 +79,28 @@ class BaseEventsStreamRow: return cls(*data) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamEventRow(BaseEventsStreamRow): TypeId = "ev" - event_id = attr.ib(type=str) - room_id = attr.ib(type=str) - type = attr.ib(type=str) - state_key = attr.ib(type=Optional[str]) - redacts = attr.ib(type=Optional[str]) - relates_to = attr.ib(type=Optional[str]) - membership = attr.ib(type=Optional[str]) - rejected = attr.ib(type=bool) + event_id: str + room_id: str + type: str + state_key: Optional[str] + redacts: Optional[str] + relates_to: Optional[str] + membership: Optional[str] + rejected: bool -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class EventsStreamCurrentStateRow(BaseEventsStreamRow): TypeId = "state" - room_id = attr.ib() # str - type = attr.ib() # str - state_key = attr.ib() # str - event_id = attr.ib() # str, optional + room_id: str + type: str + state_key: str + event_id: Optional[str] _EventRows: Tuple[Type[BaseEventsStreamRow], ...] = ( diff --git a/synapse/rest/admin/background_updates.py b/synapse/rest/admin/background_updates.py index 6ec00ce0b9..e9bce22a34 100644 --- a/synapse/rest/admin/background_updates.py +++ b/synapse/rest/admin/background_updates.py @@ -123,34 +123,25 @@ class BackgroundUpdateStartJobRestServlet(RestServlet): job_name = body["job_name"] if job_name == "populate_stats_process_rooms": - jobs = [ - { - "update_name": "populate_stats_process_rooms", - "progress_json": "{}", - }, - ] + jobs = [("populate_stats_process_rooms", "{}", "")] elif job_name == "regenerate_directory": jobs = [ - { - "update_name": "populate_user_directory_createtables", - "progress_json": "{}", - "depends_on": "", - }, - { - "update_name": "populate_user_directory_process_rooms", - "progress_json": "{}", - "depends_on": "populate_user_directory_createtables", - }, - { - "update_name": "populate_user_directory_process_users", - "progress_json": "{}", - "depends_on": "populate_user_directory_process_rooms", - }, - { - "update_name": "populate_user_directory_cleanup", - "progress_json": "{}", - "depends_on": "populate_user_directory_process_users", - }, + ("populate_user_directory_createtables", "{}", ""), + ( + "populate_user_directory_process_rooms", + "{}", + "populate_user_directory_createtables", + ), + ( + "populate_user_directory_process_users", + "{}", + "populate_user_directory_process_rooms", + ), + ( + "populate_user_directory_cleanup", + "{}", + "populate_user_directory_process_users", + ), ] else: raise SynapseError(HTTPStatus.BAD_REQUEST, "Invalid job_name") @@ -158,6 +149,7 @@ class BackgroundUpdateStartJobRestServlet(RestServlet): try: await self._store.db_pool.simple_insert_many( table="background_updates", + keys=("update_name", "progress_json", "depends_on"), values=jobs, desc=f"admin_api_run_{job_name}", ) diff --git a/synapse/rest/admin/federation.py b/synapse/rest/admin/federation.py index 50d88c9109..8cd3fa189e 100644 --- a/synapse/rest/admin/federation.py +++ b/synapse/rest/admin/federation.py @@ -111,25 +111,37 @@ class DestinationsRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: await assert_requester_is_admin(self._auth, request) + if not await self._store.is_destination_known(destination): + raise NotFoundError("Unknown destination") + destination_retry_timings = await self._store.get_destination_retry_timings( destination ) - if not destination_retry_timings: - raise NotFoundError("Unknown destination") - last_successful_stream_ordering = ( await self._store.get_destination_last_successful_stream_ordering( destination ) ) - response = { + response: JsonDict = { "destination": destination, - "failure_ts": destination_retry_timings.failure_ts, - "retry_last_ts": destination_retry_timings.retry_last_ts, - "retry_interval": destination_retry_timings.retry_interval, "last_successful_stream_ordering": last_successful_stream_ordering, } + if destination_retry_timings: + response = { + **response, + "failure_ts": destination_retry_timings.failure_ts, + "retry_last_ts": destination_retry_timings.retry_last_ts, + "retry_interval": destination_retry_timings.retry_interval, + } + else: + response = { + **response, + "failure_ts": None, + "retry_last_ts": 0, + "retry_interval": 0, + } + return HTTPStatus.OK, response diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py index 7236e4027f..299f5c9eb0 100644 --- a/synapse/rest/admin/media.py +++ b/synapse/rest/admin/media.py @@ -466,7 +466,7 @@ class UserMediaRestServlet(RestServlet): ) deleted_media, total = await self.media_repository.delete_local_media_ids( - ([row["media_id"] for row in media]) + [row["media_id"] for row in media] ) return HTTPStatus.OK, {"deleted_media": deleted_media, "total": total} diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py index 6030373ebc..2e714ac87b 100644 --- a/synapse/rest/admin/rooms.py +++ b/synapse/rest/admin/rooms.py @@ -424,7 +424,7 @@ class RoomStateRestServlet(RestServlet): event_ids = await self.store.get_current_state_ids(room_id) events = await self.store.get_events(event_ids.values()) now = self.clock.time_msec() - room_state = await self._event_serializer.serialize_events(events.values(), now) + room_state = self._event_serializer.serialize_events(events.values(), now) ret = {"state": room_state} return HTTPStatus.OK, ret @@ -744,22 +744,22 @@ class RoomEventContextServlet(RestServlet): ) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( + results["events_before"] = self._event_serializer.serialize_events( results["events_before"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( + results["event"] = self._event_serializer.serialize_event( results["event"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["events_after"] = await self._event_serializer.serialize_events( + results["events_after"] = self._event_serializer.serialize_events( results["events_after"], time_now, - bundle_aggregations=True, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py index 78e795c347..c2617ee30c 100644 --- a/synapse/rest/admin/users.py +++ b/synapse/rest/admin/users.py @@ -173,12 +173,11 @@ class UserRestServletV2(RestServlet): if not self.hs.is_mine(target_user): raise SynapseError(HTTPStatus.BAD_REQUEST, "Can only look up local users") - ret = await self.admin_handler.get_user(target_user) - - if not ret: + user_info_dict = await self.admin_handler.get_user(target_user) + if not user_info_dict: raise NotFoundError("User not found") - return HTTPStatus.OK, ret + return HTTPStatus.OK, user_info_dict async def on_PUT( self, request: SynapseRequest, user_id: str @@ -399,10 +398,10 @@ class UserRestServletV2(RestServlet): target_user, requester, body["avatar_url"], True ) - user = await self.admin_handler.get_user(target_user) - assert user is not None + user_info_dict = await self.admin_handler.get_user(target_user) + assert user_info_dict is not None - return 201, user + return HTTPStatus.CREATED, user_info_dict class UserRegisterServlet(RestServlet): diff --git a/synapse/rest/client/events.py b/synapse/rest/client/events.py index 13b72a045a..672c821061 100644 --- a/synapse/rest/client/events.py +++ b/synapse/rest/client/events.py @@ -91,7 +91,7 @@ class EventRestServlet(RestServlet): time_now = self.clock.time_msec() if event: - result = await self._event_serializer.serialize_event(event, time_now) + result = self._event_serializer.serialize_event(event, time_now) return 200, result else: return 404, "Event not found." diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index acd0c9e135..8e427a96a3 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -72,7 +72,7 @@ class NotificationsServlet(RestServlet): "actions": pa.actions, "ts": pa.received_ts, "event": ( - await self._event_serializer.serialize_event( + self._event_serializer.serialize_event( notif_events[pa.event_id], self.clock.time_msec(), event_format=format_event_for_client_v2_without_room_id, diff --git a/synapse/rest/client/relations.py b/synapse/rest/client/relations.py index 5815650ee6..37d949a71e 100644 --- a/synapse/rest/client/relations.py +++ b/synapse/rest/client/relations.py @@ -19,28 +19,20 @@ any time to reflect changes in the MSC. """ import logging -from typing import TYPE_CHECKING, Awaitable, Optional, Tuple +from typing import TYPE_CHECKING, Optional, Tuple -from synapse.api.constants import EventTypes, RelationTypes -from synapse.api.errors import ShadowBanError, SynapseError +from synapse.api.constants import RelationTypes +from synapse.api.errors import SynapseError from synapse.http.server import HttpServer -from synapse.http.servlet import ( - RestServlet, - parse_integer, - parse_json_object_from_request, - parse_string, -) +from synapse.http.servlet import RestServlet, parse_integer, parse_string from synapse.http.site import SynapseRequest -from synapse.rest.client.transactions import HttpTransactionCache +from synapse.rest.client._base import client_patterns from synapse.storage.relations import ( AggregationPaginationToken, PaginationChunk, RelationPaginationToken, ) from synapse.types import JsonDict -from synapse.util.stringutils import random_string - -from ._base import client_patterns if TYPE_CHECKING: from synapse.server import HomeServer @@ -48,112 +40,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -class RelationSendServlet(RestServlet): - """Helper API for sending events that have relation data. - - Example API shape to send a 👍 reaction to a room: - - POST /rooms/!foo/send_relation/$bar/m.annotation/m.reaction?key=%F0%9F%91%8D - {} - - { - "event_id": "$foobar" - } - """ - - PATTERN = ( - "/rooms/(?P<room_id>[^/]*)/send_relation" - "/(?P<parent_id>[^/]*)/(?P<relation_type>[^/]*)/(?P<event_type>[^/]*)" - ) - - def __init__(self, hs: "HomeServer"): - super().__init__() - self.auth = hs.get_auth() - self.event_creation_handler = hs.get_event_creation_handler() - self.txns = HttpTransactionCache(hs) - - def register(self, http_server: HttpServer) -> None: - http_server.register_paths( - "POST", - client_patterns(self.PATTERN + "$", releases=()), - self.on_PUT_or_POST, - self.__class__.__name__, - ) - http_server.register_paths( - "PUT", - client_patterns(self.PATTERN + "/(?P<txn_id>[^/]*)$", releases=()), - self.on_PUT, - self.__class__.__name__, - ) - - def on_PUT( - self, - request: SynapseRequest, - room_id: str, - parent_id: str, - relation_type: str, - event_type: str, - txn_id: Optional[str] = None, - ) -> Awaitable[Tuple[int, JsonDict]]: - return self.txns.fetch_or_execute_request( - request, - self.on_PUT_or_POST, - request, - room_id, - parent_id, - relation_type, - event_type, - txn_id, - ) - - async def on_PUT_or_POST( - self, - request: SynapseRequest, - room_id: str, - parent_id: str, - relation_type: str, - event_type: str, - txn_id: Optional[str] = None, - ) -> Tuple[int, JsonDict]: - requester = await self.auth.get_user_by_req(request, allow_guest=True) - - if event_type == EventTypes.Member: - # Add relations to a membership is meaningless, so we just deny it - # at the CS API rather than trying to handle it correctly. - raise SynapseError(400, "Cannot send member events with relations") - - content = parse_json_object_from_request(request) - - aggregation_key = parse_string(request, "key", encoding="utf-8") - - content["m.relates_to"] = { - "event_id": parent_id, - "rel_type": relation_type, - } - if aggregation_key is not None: - content["m.relates_to"]["key"] = aggregation_key - - event_dict = { - "type": event_type, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - } - - try: - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - requester, event_dict=event_dict, txn_id=txn_id - ) - event_id = event.event_id - except ShadowBanError: - event_id = "$" + random_string(43) - - return 200, {"event_id": event_id} - - class RelationPaginationServlet(RestServlet): """API to paginate relations on an event by topological ordering, optionally filtered by relation type and event type. @@ -227,13 +113,14 @@ class RelationPaginationServlet(RestServlet): now = self.clock.time_msec() # Do not bundle aggregations when retrieving the original event because # we want the content before relations are applied to it. - original_event = await self._event_serializer.serialize_event( - event, now, bundle_aggregations=False + original_event = self._event_serializer.serialize_event( + event, now, bundle_aggregations=None ) # The relations returned for the requested event do include their # bundled aggregations. - serialized_events = await self._event_serializer.serialize_events( - events, now, bundle_aggregations=True + aggregations = await self.store.get_bundled_aggregations(events) + serialized_events = self._event_serializer.serialize_events( + events, now, bundle_aggregations=aggregations ) return_value = pagination_chunk.to_dict() @@ -422,7 +309,7 @@ class RelationAggregationGroupPaginationServlet(RestServlet): ) now = self.clock.time_msec() - serialized_events = await self._event_serializer.serialize_events(events, now) + serialized_events = self._event_serializer.serialize_events(events, now) return_value = result.to_dict() return_value["chunk"] = serialized_events @@ -431,7 +318,6 @@ class RelationAggregationGroupPaginationServlet(RestServlet): def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: - RelationSendServlet(hs).register(http_server) RelationPaginationServlet(hs).register(http_server) RelationAggregationPaginationServlet(hs).register(http_server) RelationAggregationGroupPaginationServlet(hs).register(http_server) diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index 40330749e5..da6014900a 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -642,6 +642,7 @@ class RoomEventServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() self.clock = hs.get_clock() + self._store = hs.get_datastore() self.event_handler = hs.get_event_handler() self._event_serializer = hs.get_event_client_serializer() self.auth = hs.get_auth() @@ -660,10 +661,13 @@ class RoomEventServlet(RestServlet): # https://matrix.org/docs/spec/client_server/r0.5.0#get-matrix-client-r0-rooms-roomid-event-eventid raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) - time_now = self.clock.time_msec() if event: - event_dict = await self._event_serializer.serialize_event( - event, time_now, bundle_aggregations=True + # Ensure there are bundled aggregations available. + aggregations = await self._store.get_bundled_aggregations([event]) + + time_now = self.clock.time_msec() + event_dict = self._event_serializer.serialize_event( + event, time_now, bundle_aggregations=aggregations ) return 200, event_dict @@ -708,16 +712,20 @@ class RoomEventContextServlet(RestServlet): raise SynapseError(404, "Event not found.", errcode=Codes.NOT_FOUND) time_now = self.clock.time_msec() - results["events_before"] = await self._event_serializer.serialize_events( - results["events_before"], time_now, bundle_aggregations=True + results["events_before"] = self._event_serializer.serialize_events( + results["events_before"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["event"] = await self._event_serializer.serialize_event( - results["event"], time_now, bundle_aggregations=True + results["event"] = self._event_serializer.serialize_event( + results["event"], time_now, bundle_aggregations=results["aggregations"] ) - results["events_after"] = await self._event_serializer.serialize_events( - results["events_after"], time_now, bundle_aggregations=True + results["events_after"] = self._event_serializer.serialize_events( + results["events_after"], + time_now, + bundle_aggregations=results["aggregations"], ) - results["state"] = await self._event_serializer.serialize_events( + results["state"] = self._event_serializer.serialize_events( results["state"], time_now ) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index e99a943d0d..d20ae1421e 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -17,7 +17,6 @@ from collections import defaultdict from typing import ( TYPE_CHECKING, Any, - Awaitable, Callable, Dict, Iterable, @@ -395,7 +394,7 @@ class SyncRestServlet(RestServlet): """ invited = {} for room in rooms: - invite = await self._event_serializer.serialize_event( + invite = self._event_serializer.serialize_event( room.invite, time_now, token_id=token_id, @@ -432,7 +431,7 @@ class SyncRestServlet(RestServlet): """ knocked = {} for room in rooms: - knock = await self._event_serializer.serialize_event( + knock = self._event_serializer.serialize_event( room.knock, time_now, token_id=token_id, @@ -525,21 +524,14 @@ class SyncRestServlet(RestServlet): The room, encoded in our response format """ - def serialize(events: Iterable[EventBase]) -> Awaitable[List[JsonDict]]: + def serialize( + events: Iterable[EventBase], + aggregations: Optional[Dict[str, Dict[str, Any]]] = None, + ) -> List[JsonDict]: return self._event_serializer.serialize_events( events, time_now=time_now, - # Don't bother to bundle aggregations if the timeline is unlimited, - # as clients will have all the necessary information. - # bundle_aggregations=room.timeline.limited, - # - # richvdh 2021-12-15: disable this temporarily as it has too high an - # overhead for initialsyncs. We need to figure out a way that the - # bundling can be done *before* the events are stored in the - # SyncResponseCache so that this part can be synchronous. - # - # Ensure to re-enable the test at tests/rest/client/test_relations.py::RelationsTestCase.test_bundled_aggregations. - bundle_aggregations=False, + bundle_aggregations=aggregations, token_id=token_id, event_format=event_formatter, only_event_fields=only_fields, @@ -561,8 +553,10 @@ class SyncRestServlet(RestServlet): event.room_id, ) - serialized_state = await serialize(state_events) - serialized_timeline = await serialize(timeline_events) + serialized_state = serialize(state_events) + serialized_timeline = serialize( + timeline_events, room.timeline.bundled_aggregations + ) account_data = room.account_data diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index fca239d8c7..9f6c251caf 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -343,7 +343,7 @@ class SpamMediaException(NotFoundError): """ -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class ReadableFileWrapper: """Wrapper that allows reading a file in chunks, yielding to the reactor, and writing to a callback. @@ -354,8 +354,8 @@ class ReadableFileWrapper: CHUNK_SIZE = 2 ** 14 - clock = attr.ib(type=Clock) - path = attr.ib(type=str) + clock: Clock + path: str async def write_chunks_to(self, callback: Callable[[bytes], None]) -> None: """Reads the file in chunks and calls the callback with each chunk.""" diff --git a/synapse/server.py b/synapse/server.py index 185e40e4da..3032f0b738 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -759,7 +759,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer(self) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 69ac8c3423..923e31587e 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -450,19 +450,19 @@ class StateHandler: return {key: state_map[ev_id] for key, ev_id in new_state.items()} -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _StateResMetrics: """Keeps track of some usage metrics about state res.""" # System and User CPU time, in seconds - cpu_time = attr.ib(type=float, default=0.0) + cpu_time: float = 0.0 # time spent on database transactions (excluding scheduling time). This roughly # corresponds to the amount of work done on the db server, excluding event fetches. - db_time = attr.ib(type=float, default=0.0) + db_time: float = 0.0 # number of events fetched from the db. - db_events = attr.ib(type=int, default=0) + db_events: int = 0 _biggest_room_by_cpu_counter = Counter( diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 2cacc7dd6c..57cc1d76e0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -143,7 +143,7 @@ def make_conn( return db_conn -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class LoggingDatabaseConnection: """A wrapper around a database connection that returns `LoggingTransaction` as its cursor class. @@ -151,9 +151,9 @@ class LoggingDatabaseConnection: This is mainly used on startup to ensure that queries get logged correctly """ - conn = attr.ib(type=Connection) - engine = attr.ib(type=BaseDatabaseEngine) - default_txn_name = attr.ib(type=str) + conn: Connection + engine: BaseDatabaseEngine + default_txn_name: str def cursor( self, *, txn_name=None, after_callbacks=None, exception_callbacks=None @@ -934,56 +934,6 @@ class DatabasePool: txn.execute(sql, vals) async def simple_insert_many( - self, table: str, values: List[Dict[str, Any]], desc: str - ) -> None: - """Executes an INSERT query on the named table. - - The input is given as a list of dicts, with one dict per row. - Generally simple_insert_many_values should be preferred for new code. - - Args: - table: string giving the table name - values: dict of new column names and values for them - desc: description of the transaction, for logging and metrics - """ - await self.runInteraction(desc, self.simple_insert_many_txn, table, values) - - @staticmethod - def simple_insert_many_txn( - txn: LoggingTransaction, table: str, values: List[Dict[str, Any]] - ) -> None: - """Executes an INSERT query on the named table. - - The input is given as a list of dicts, with one dict per row. - Generally simple_insert_many_values_txn should be preferred for new code. - - Args: - txn: The transaction to use. - table: string giving the table name - values: dict of new column names and values for them - """ - if not values: - return - - # This is a *slight* abomination to get a list of tuples of key names - # and a list of tuples of value names. - # - # i.e. [{"a": 1, "b": 2}, {"c": 3, "d": 4}] - # => [("a", "b",), ("c", "d",)] and [(1, 2,), (3, 4,)] - # - # The sort is to ensure that we don't rely on dictionary iteration - # order. - keys, vals = zip( - *(zip(*(sorted(i.items(), key=lambda kv: kv[0]))) for i in values if i) - ) - - for k in keys: - if k != keys[0]: - raise RuntimeError("All items must have the same keys") - - return DatabasePool.simple_insert_many_values_txn(txn, table, keys[0], vals) - - async def simple_insert_many_values( self, table: str, keys: Collection[str], @@ -1002,11 +952,11 @@ class DatabasePool: desc: description of the transaction, for logging and metrics """ await self.runInteraction( - desc, self.simple_insert_many_values_txn, table, keys, values + desc, self.simple_insert_many_txn, table, keys, values ) @staticmethod - def simple_insert_many_values_txn( + def simple_insert_many_txn( txn: LoggingTransaction, table: str, keys: Collection[str], diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 32a553fdd7..ef475e18c7 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -450,7 +450,7 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): async def add_account_data_for_user( self, user_id: str, account_data_type: str, content: JsonDict ) -> int: - """Add some account_data to a room for a user. + """Add some global account_data for a user. Args: user_id: The user to add a tag for. @@ -536,9 +536,9 @@ class AccountDataWorkerStore(CacheInvalidationWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="ignored_users", + keys=("ignorer_user_id", "ignored_user_id"), values=[ - {"ignorer_user_id": user_id, "ignored_user_id": u} - for u in currently_ignored_users - previously_ignored_users + (user_id, u) for u in currently_ignored_users - previously_ignored_users ], ) diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 3682cb6a81..4eca97189b 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -432,14 +432,21 @@ class DeviceInboxWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="device_federation_outbox", + keys=( + "destination", + "stream_id", + "queued_ts", + "messages_json", + "instance_name", + ), values=[ - { - "destination": destination, - "stream_id": stream_id, - "queued_ts": now_ms, - "messages_json": json_encoder.encode(edu), - "instance_name": self._instance_name, - } + ( + destination, + stream_id, + now_ms, + json_encoder.encode(edu), + self._instance_name, + ) for destination, edu in remote_messages_by_destination.items() ], ) @@ -571,14 +578,9 @@ class DeviceInboxWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="device_inbox", + keys=("user_id", "device_id", "stream_id", "message_json", "instance_name"), values=[ - { - "user_id": user_id, - "device_id": device_id, - "stream_id": stream_id, - "message_json": message_json, - "instance_name": self._instance_name, - } + (user_id, device_id, stream_id, message_json, self._instance_name) for user_id, messages_by_device in local_by_user_then_device.items() for device_id, message_json in messages_by_device.items() ], diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index bc7e876047..8f0cd0695f 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -781,7 +781,7 @@ class DeviceWorkerStore(SQLBaseStore): @cached(max_entries=10000) async def get_device_list_last_stream_id_for_remote( self, user_id: str - ) -> Optional[Any]: + ) -> Optional[str]: """Get the last stream_id we got for a user. May be None if we haven't got any information for them. """ @@ -797,7 +797,9 @@ class DeviceWorkerStore(SQLBaseStore): cached_method_name="get_device_list_last_stream_id_for_remote", list_name="user_ids", ) - async def get_device_list_last_stream_id_for_remotes(self, user_ids: Iterable[str]): + async def get_device_list_last_stream_id_for_remotes( + self, user_ids: Iterable[str] + ) -> Dict[str, Optional[str]]: rows = await self.db_pool.simple_select_many_batch( table="device_lists_remote_extremeties", column="user_id", @@ -1384,6 +1386,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): content: JsonDict, stream_id: str, ) -> None: + """Delete, update or insert a cache entry for this (user, device) pair.""" if content.get("deleted"): self.db_pool.simple_delete_txn( txn, @@ -1443,6 +1446,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): def _update_remote_device_list_cache_txn( self, txn: LoggingTransaction, user_id: str, devices: List[dict], stream_id: int ) -> None: + """Replace the list of cached devices for this user with the given list.""" self.db_pool.simple_delete_txn( txn, table="device_lists_remote_cache", keyvalues={"user_id": user_id} ) @@ -1450,12 +1454,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_remote_cache", + keys=("user_id", "device_id", "content"), values=[ - { - "user_id": user_id, - "device_id": content["device_id"], - "content": json_encoder.encode(content), - } + (user_id, content["device_id"], json_encoder.encode(content)) for content in devices ], ) @@ -1543,8 +1544,9 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_stream", + keys=("stream_id", "user_id", "device_id"), values=[ - {"stream_id": stream_id, "user_id": user_id, "device_id": device_id} + (stream_id, user_id, device_id) for stream_id, device_id in zip(stream_ids, device_ids) ], ) @@ -1571,18 +1573,27 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="device_lists_outbound_pokes", + keys=( + "destination", + "stream_id", + "user_id", + "device_id", + "sent", + "ts", + "opentracing_context", + ), values=[ - { - "destination": destination, - "stream_id": next(next_stream_id), - "user_id": user_id, - "device_id": device_id, - "sent": False, - "ts": now, - "opentracing_context": json_encoder.encode(context) + ( + destination, + next(next_stream_id), + user_id, + device_id, + False, + now, + json_encoder.encode(context) if whitelisted_homeserver(destination) else "{}", - } + ) for destination in hosts for device_id in device_ids ], diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index f76c6121e8..5903fdaf00 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -112,10 +112,8 @@ class DirectoryWorkerStore(CacheInvalidationWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="room_alias_servers", - values=[ - {"room_alias": room_alias.to_string(), "server": server} - for server in servers - ], + keys=("room_alias", "server"), + values=[(room_alias.to_string(), server) for server in servers], ) self._invalidate_cache_and_stream( diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index 0cb48b9dd7..b789a588a5 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -110,16 +110,16 @@ class EndToEndRoomKeyStore(SQLBaseStore): values = [] for (room_id, session_id, room_key) in room_keys: values.append( - { - "user_id": user_id, - "version": version_int, - "room_id": room_id, - "session_id": session_id, - "first_message_index": room_key["first_message_index"], - "forwarded_count": room_key["forwarded_count"], - "is_verified": room_key["is_verified"], - "session_data": json_encoder.encode(room_key["session_data"]), - } + ( + user_id, + version_int, + room_id, + session_id, + room_key["first_message_index"], + room_key["forwarded_count"], + room_key["is_verified"], + json_encoder.encode(room_key["session_data"]), + ) ) log_kv( { @@ -131,7 +131,19 @@ class EndToEndRoomKeyStore(SQLBaseStore): ) await self.db_pool.simple_insert_many( - table="e2e_room_keys", values=values, desc="add_e2e_room_keys" + table="e2e_room_keys", + keys=( + "user_id", + "version", + "room_id", + "session_id", + "first_message_index", + "forwarded_count", + "is_verified", + "session_data", + ), + values=values, + desc="add_e2e_room_keys", ) @trace diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index 57b5ffbad3..1f8447b507 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -50,16 +50,16 @@ if TYPE_CHECKING: from synapse.server import HomeServer -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DeviceKeyLookupResult: """The type returned by get_e2e_device_keys_and_signatures""" - display_name = attr.ib(type=Optional[str]) + display_name: Optional[str] # the key data from e2e_device_keys_json. Typically includes fields like # "algorithm", "keys" (including the curve25519 identity key and the ed25519 signing # key) and "signatures" (a map from (user id) to (key id/device_id) to signature.) - keys = attr.ib(type=Optional[JsonDict]) + keys: Optional[JsonDict] class EndToEndKeyBackgroundStore(SQLBaseStore): @@ -387,15 +387,16 @@ class EndToEndKeyWorkerStore(EndToEndKeyBackgroundStore, CacheInvalidationWorker self.db_pool.simple_insert_many_txn( txn, table="e2e_one_time_keys_json", + keys=( + "user_id", + "device_id", + "algorithm", + "key_id", + "ts_added_ms", + "key_json", + ), values=[ - { - "user_id": user_id, - "device_id": device_id, - "algorithm": algorithm, - "key_id": key_id, - "ts_added_ms": time_now, - "key_json": json_bytes, - } + (user_id, device_id, algorithm, key_id, time_now, json_bytes) for algorithm, key_id, json_bytes in new_keys ], ) @@ -1186,15 +1187,22 @@ class EndToEndKeyStore(EndToEndKeyWorkerStore, SQLBaseStore): """ await self.db_pool.simple_insert_many( "e2e_cross_signing_signatures", - [ - { - "user_id": user_id, - "key_id": item.signing_key_id, - "target_user_id": item.target_user_id, - "target_device_id": item.target_device_id, - "signature": item.signature, - } + keys=( + "user_id", + "key_id", + "target_user_id", + "target_device_id", + "signature", + ), + values=[ + ( + user_id, + item.signing_key_id, + item.target_user_id, + item.target_device_id, + item.signature, + ) for item in signatures ], - "add_e2e_signing_key", + desc="add_e2e_signing_key", ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index a98e6b2593..b7c4c62222 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -875,14 +875,21 @@ class EventPushActionsWorkerStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="event_push_summary", + keys=( + "user_id", + "room_id", + "notif_count", + "unread_count", + "stream_ordering", + ), values=[ - { - "user_id": user_id, - "room_id": room_id, - "notif_count": summary.notif_count, - "unread_count": summary.unread_count, - "stream_ordering": summary.stream_ordering, - } + ( + user_id, + room_id, + summary.notif_count, + summary.unread_count, + summary.stream_ordering, + ) for ((user_id, room_id), summary) in summaries.items() if summary.old_user_id is None ], diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index dd255aefb9..de3b48524b 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -69,7 +69,7 @@ event_counter = Counter( ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DeltaState: """Deltas to use to update the `current_state_events` table. @@ -80,9 +80,9 @@ class DeltaState: should e.g. be removed from `current_state_events` table. """ - to_delete = attr.ib(type=List[Tuple[str, str]]) - to_insert = attr.ib(type=StateMap[str]) - no_longer_in_room = attr.ib(type=bool, default=False) + to_delete: List[Tuple[str, str]] + to_insert: StateMap[str] + no_longer_in_room: bool = False class PersistEventsStore: @@ -442,12 +442,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_auth", + keys=("event_id", "room_id", "auth_id"), values=[ - { - "event_id": event.event_id, - "room_id": event.room_id, - "auth_id": auth_id, - } + (event.event_id, event.room_id, auth_id) for event in events for auth_id in event.auth_event_ids() if event.is_state() @@ -675,8 +672,9 @@ class PersistEventsStore: db_pool.simple_insert_many_txn( txn, table="event_auth_chains", + keys=("event_id", "chain_id", "sequence_number"), values=[ - {"event_id": event_id, "chain_id": c_id, "sequence_number": seq} + (event_id, c_id, seq) for event_id, (c_id, seq) in new_chain_tuples.items() ], ) @@ -782,13 +780,14 @@ class PersistEventsStore: db_pool.simple_insert_many_txn( txn, table="event_auth_chain_links", + keys=( + "origin_chain_id", + "origin_sequence_number", + "target_chain_id", + "target_sequence_number", + ), values=[ - { - "origin_chain_id": source_id, - "origin_sequence_number": source_seq, - "target_chain_id": target_id, - "target_sequence_number": target_seq, - } + (source_id, source_seq, target_id, target_seq) for ( source_id, source_seq, @@ -943,20 +942,28 @@ class PersistEventsStore: txn_id = getattr(event.internal_metadata, "txn_id", None) if token_id and txn_id: to_insert.append( - { - "event_id": event.event_id, - "room_id": event.room_id, - "user_id": event.sender, - "token_id": token_id, - "txn_id": txn_id, - "inserted_ts": self._clock.time_msec(), - } + ( + event.event_id, + event.room_id, + event.sender, + token_id, + txn_id, + self._clock.time_msec(), + ) ) if to_insert: self.db_pool.simple_insert_many_txn( txn, table="event_txn_id", + keys=( + "event_id", + "room_id", + "user_id", + "token_id", + "txn_id", + "inserted_ts", + ), values=to_insert, ) @@ -1161,8 +1168,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_forward_extremities", + keys=("event_id", "room_id"), values=[ - {"event_id": ev_id, "room_id": room_id} + (ev_id, room_id) for room_id, new_extrem in new_forward_extremities.items() for ev_id in new_extrem ], @@ -1174,12 +1182,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="stream_ordering_to_exterm", + keys=("room_id", "event_id", "stream_ordering"), values=[ - { - "room_id": room_id, - "event_id": event_id, - "stream_ordering": max_stream_order, - } + (room_id, event_id, max_stream_order) for room_id, new_extrem in new_forward_extremities.items() for event_id in new_extrem ], @@ -1342,7 +1347,7 @@ class PersistEventsStore: d.pop("redacted_because", None) return d - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="event_json", keys=("event_id", "room_id", "internal_metadata", "json", "format_version"), @@ -1358,7 +1363,7 @@ class PersistEventsStore: ), ) - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="events", keys=( @@ -1412,7 +1417,7 @@ class PersistEventsStore: ) txn.execute(sql + clause, [False] + args) - self.db_pool.simple_insert_many_values_txn( + self.db_pool.simple_insert_many_txn( txn, table="state_events", keys=("event_id", "room_id", "type", "state_key"), @@ -1622,14 +1627,9 @@ class PersistEventsStore: return self.db_pool.simple_insert_many_txn( txn=txn, table="event_labels", + keys=("event_id", "label", "room_id", "topological_ordering"), values=[ - { - "event_id": event_id, - "label": label, - "room_id": room_id, - "topological_ordering": topological_ordering, - } - for label in labels + (event_id, label, room_id, topological_ordering) for label in labels ], ) @@ -1657,16 +1657,13 @@ class PersistEventsStore: vals = [] for event in events: ref_alg, ref_hash_bytes = compute_event_reference_hash(event) - vals.append( - { - "event_id": event.event_id, - "algorithm": ref_alg, - "hash": memoryview(ref_hash_bytes), - } - ) + vals.append((event.event_id, ref_alg, memoryview(ref_hash_bytes))) self.db_pool.simple_insert_many_txn( - txn, table="event_reference_hashes", values=vals + txn, + table="event_reference_hashes", + keys=("event_id", "algorithm", "hash"), + values=vals, ) def _store_room_members_txn( @@ -1689,18 +1686,25 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="room_memberships", + keys=( + "event_id", + "user_id", + "sender", + "room_id", + "membership", + "display_name", + "avatar_url", + ), values=[ - { - "event_id": event.event_id, - "user_id": event.state_key, - "sender": event.user_id, - "room_id": event.room_id, - "membership": event.membership, - "display_name": non_null_str_or_none( - event.content.get("displayname") - ), - "avatar_url": non_null_str_or_none(event.content.get("avatar_url")), - } + ( + event.event_id, + event.state_key, + event.user_id, + event.room_id, + event.membership, + non_null_str_or_none(event.content.get("displayname")), + non_null_str_or_none(event.content.get("avatar_url")), + ) for event in events ], ) @@ -2163,13 +2167,9 @@ class PersistEventsStore: self.db_pool.simple_insert_many_txn( txn, table="event_edges", + keys=("event_id", "prev_event_id", "room_id", "is_state"), values=[ - { - "event_id": ev.event_id, - "prev_event_id": e_id, - "room_id": ev.room_id, - "is_state": False, - } + (ev.event_id, e_id, ev.room_id, False) for ev in events for e_id in ev.prev_event_ids() ], @@ -2226,17 +2226,17 @@ class PersistEventsStore: ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _LinkMap: """A helper type for tracking links between chains.""" # Stores the set of links as nested maps: source chain ID -> target chain ID # -> source sequence number -> target sequence number. - maps = attr.ib(type=Dict[int, Dict[int, Dict[int, int]]], factory=dict) + maps: Dict[int, Dict[int, Dict[int, int]]] = attr.Factory(dict) # Stores the links that have been added (with new set to true), as tuples of # `(source chain ID, source sequence no, target chain ID, target sequence no.)` - additions = attr.ib(type=Set[Tuple[int, int, int, int]], factory=set) + additions: Set[Tuple[int, int, int, int]] = attr.Factory(set) def add_link( self, diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index a68f14ba48..d5f0059665 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -65,22 +65,22 @@ class _BackgroundUpdates: REPLACE_STREAM_ORDERING_COLUMN = "replace_stream_ordering_column" -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _CalculateChainCover: """Return value for _calculate_chain_cover_txn.""" # The last room_id/depth/stream processed. - room_id = attr.ib(type=str) - depth = attr.ib(type=int) - stream = attr.ib(type=int) + room_id: str + depth: int + stream: int # Number of rows processed - processed_count = attr.ib(type=int) + processed_count: int # Map from room_id to last depth/stream processed for each room that we have # processed all events for (i.e. the rooms we can flip the # `has_auth_chain_index` for) - finished_room_map = attr.ib(type=Dict[str, Tuple[int, int]]) + finished_room_map: Dict[str, Tuple[int, int]] class EventsBackgroundUpdatesStore(SQLBaseStore): @@ -684,13 +684,14 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): self.db_pool.simple_insert_many_txn( txn=txn, table="event_labels", + keys=("event_id", "label", "room_id", "topological_ordering"), values=[ - { - "event_id": event_id, - "label": label, - "room_id": event_json["room_id"], - "topological_ordering": event_json["depth"], - } + ( + event_id, + label, + event_json["room_id"], + event_json["depth"], + ) for label in event_json["content"].get( EventContentFields.LABELS, [] ) @@ -803,29 +804,19 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if not has_state: state_events.append( - { - "event_id": event.event_id, - "room_id": event.room_id, - "type": event.type, - "state_key": event.state_key, - } + (event.event_id, event.room_id, event.type, event.state_key) ) if not has_event_auth: # Old, dodgy, events may have duplicate auth events, which we # need to deduplicate as we have a unique constraint. for auth_id in set(event.auth_event_ids()): - auth_events.append( - { - "room_id": event.room_id, - "event_id": event.event_id, - "auth_id": auth_id, - } - ) + auth_events.append((event.event_id, event.room_id, auth_id)) if state_events: await self.db_pool.simple_insert_many( table="state_events", + keys=("event_id", "room_id", "type", "state_key"), values=state_events, desc="_rejected_events_metadata_state_events", ) @@ -833,6 +824,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore): if auth_events: await self.db_pool.simple_insert_many( table="event_auth", + keys=("event_id", "room_id", "auth_id"), values=auth_events, desc="_rejected_events_metadata_event_auth", ) diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index cbf9ec38f7..4f05811a77 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -129,18 +129,29 @@ class PresenceStore(PresenceBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="presence_stream", + keys=( + "stream_id", + "user_id", + "state", + "last_active_ts", + "last_federation_update_ts", + "last_user_sync_ts", + "status_msg", + "currently_active", + "instance_name", + ), values=[ - { - "stream_id": stream_id, - "user_id": state.user_id, - "state": state.state, - "last_active_ts": state.last_active_ts, - "last_federation_update_ts": state.last_federation_update_ts, - "last_user_sync_ts": state.last_user_sync_ts, - "status_msg": state.status_msg, - "currently_active": state.currently_active, - "instance_name": self._instance_name, - } + ( + stream_id, + state.user_id, + state.state, + state.last_active_ts, + state.last_federation_update_ts, + state.last_user_sync_ts, + state.status_msg, + state.currently_active, + self._instance_name, + ) for stream_id, state in zip(stream_orderings, presence_states) ], ) diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 747b4f31df..cf64cd63a4 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -561,13 +561,9 @@ class PusherStore(PusherWorkerStore): self.db_pool.simple_insert_many_txn( txn, table="deleted_pushers", + keys=("stream_id", "app_id", "pushkey", "user_id"), values=[ - { - "stream_id": stream_id, - "app_id": pusher.app_id, - "pushkey": pusher.pushkey, - "user_id": user_id, - } + (stream_id, pusher.app_id, pusher.pushkey, user_id) for stream_id, pusher in zip(stream_ids, pushers) ], ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 4175c82a25..aac94fa464 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -51,7 +51,7 @@ class ExternalIDReuseException(Exception): pass -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class TokenLookupResult: """Result of looking up an access token. @@ -69,14 +69,14 @@ class TokenLookupResult: cached. """ - user_id = attr.ib(type=str) - is_guest = attr.ib(type=bool, default=False) - shadow_banned = attr.ib(type=bool, default=False) - token_id = attr.ib(type=Optional[int], default=None) - device_id = attr.ib(type=Optional[str], default=None) - valid_until_ms = attr.ib(type=Optional[int], default=None) - token_owner = attr.ib(type=str) - token_used = attr.ib(type=bool, default=False) + user_id: str + is_guest: bool = False + shadow_banned: bool = False + token_id: Optional[int] = None + device_id: Optional[str] = None + valid_until_ms: Optional[int] = None + token_owner: str = attr.ib() + token_used: bool = False # Make the token owner default to the user ID, which is the common case. @token_owner.default diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 4ff6aed253..c6c4bd18da 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -13,14 +13,30 @@ # limitations under the License. import logging -from typing import List, Optional, Tuple, Union, cast +from typing import ( + TYPE_CHECKING, + Any, + Dict, + Iterable, + List, + Optional, + Tuple, + Union, + cast, +) import attr +from frozendict import frozendict -from synapse.api.constants import RelationTypes +from synapse.api.constants import EventTypes, RelationTypes from synapse.events import EventBase from synapse.storage._base import SQLBaseStore -from synapse.storage.database import LoggingTransaction, make_in_list_sql_clause +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) from synapse.storage.databases.main.stream import generate_pagination_where_clause from synapse.storage.relations import ( AggregationPaginationToken, @@ -29,10 +45,24 @@ from synapse.storage.relations import ( ) from synapse.util.caches.descriptors import cached +if TYPE_CHECKING: + from synapse.server import HomeServer + logger = logging.getLogger(__name__) class RelationsWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._msc1849_enabled = hs.config.experimental.msc1849_enabled + self._msc3440_enabled = hs.config.experimental.msc3440_enabled + @cached(tree=True) async def get_relations_for_event( self, @@ -515,6 +545,98 @@ class RelationsWorkerStore(SQLBaseStore): "get_if_user_has_annotated_event", _get_if_user_has_annotated_event ) + async def _get_bundled_aggregation_for_event( + self, event: EventBase + ) -> Optional[Dict[str, Any]]: + """Generate bundled aggregations for an event. + + Note that this does not use a cache, but depends on cached methods. + + Args: + event: The event to calculate bundled aggregations for. + + Returns: + The bundled aggregations for an event, if bundled aggregations are + enabled and the event can have bundled aggregations. + """ + # State events and redacted events do not get bundled aggregations. + if event.is_state() or event.internal_metadata.is_redacted(): + return None + + # Do not bundle aggregations for an event which represents an edit or an + # annotation. It does not make sense for them to have related events. + relates_to = event.content.get("m.relates_to") + if isinstance(relates_to, (dict, frozendict)): + relation_type = relates_to.get("rel_type") + if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE): + return None + + event_id = event.event_id + room_id = event.room_id + + # The bundled aggregations to include, a mapping of relation type to a + # type-specific value. Some types include the direct return type here + # while others need more processing during serialization. + aggregations: Dict[str, Any] = {} + + annotations = await self.get_aggregation_groups_for_event(event_id, room_id) + if annotations.chunk: + aggregations[RelationTypes.ANNOTATION] = annotations.to_dict() + + references = await self.get_relations_for_event( + event_id, room_id, RelationTypes.REFERENCE, direction="f" + ) + if references.chunk: + aggregations[RelationTypes.REFERENCE] = references.to_dict() + + edit = None + if event.type == EventTypes.Message: + edit = await self.get_applicable_edit(event_id, room_id) + + if edit: + aggregations[RelationTypes.REPLACE] = edit + + # If this event is the start of a thread, include a summary of the replies. + if self._msc3440_enabled: + ( + thread_count, + latest_thread_event, + ) = await self.get_thread_summary(event_id, room_id) + if latest_thread_event: + aggregations[RelationTypes.THREAD] = { + # Don't bundle aggregations as this could recurse forever. + "latest_event": latest_thread_event, + "count": thread_count, + } + + # Store the bundled aggregations in the event metadata for later use. + return aggregations + + async def get_bundled_aggregations( + self, events: Iterable[EventBase] + ) -> Dict[str, Dict[str, Any]]: + """Generate bundled aggregations for events. + + Args: + events: The iterable of events to calculate bundled aggregations for. + + Returns: + A map of event ID to the bundled aggregation for the event. Not all + events may have bundled aggregations in the results. + """ + # If bundled aggregations are disabled, nothing to do. + if not self._msc1849_enabled: + return {} + + # TODO Parallelize. + results = {} + for event in events: + event_result = await self._get_bundled_aggregation_for_event(event) + if event_result is not None: + results[event.event_id] = event_result + + return results + class RelationsStore(RelationsWorkerStore): pass diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index cda80d6511..4489732fda 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1177,18 +1177,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _JoinedHostsCache: """The cached data used by the `_get_joined_hosts_cache`.""" # Dict of host to the set of their users in the room at the state group. - hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) + hosts_to_joined_users: Dict[str, Set[str]] = attr.Factory(dict) # The state group `hosts_to_joined_users` is derived from. Will be an object # if the instance is newly created or if the state is not based on a state # group. (An object is used as a sentinel value to ensure that it never is # equal to anything else). - state_group = attr.ib(type=Union[object, int], factory=object) + state_group: Union[object, int] = attr.Factory(object) def __len__(self): return sum(len(v) for v in self.hosts_to_joined_users.values()) diff --git a/synapse/storage/databases/main/session.py b/synapse/storage/databases/main/session.py index 5a97120437..e8c776b97a 100644 --- a/synapse/storage/databases/main/session.py +++ b/synapse/storage/databases/main/session.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 6c299cafa5..4b78b4d098 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -560,3 +560,14 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): return await self.db_pool.runInteraction( "get_destinations_paginate_txn", get_destinations_paginate_txn ) + + async def is_destination_known(self, destination: str) -> bool: + """Check if a destination is known to the server.""" + result = await self.db_pool.simple_select_one_onecol( + table="destinations", + keyvalues={"destination": destination}, + retcol="1", + allow_none=True, + desc="is_destination_known", + ) + return bool(result) diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index a1a1a6a14a..2d339b6008 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -23,19 +23,19 @@ from synapse.types import JsonDict from synapse.util import json_encoder, stringutils -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class UIAuthSessionData: - session_id = attr.ib(type=str) + session_id: str # The dictionary from the client root level, not the 'auth' key. - clientdict = attr.ib(type=JsonDict) + clientdict: JsonDict # The URI and method the session was intiatied with. These are checked at # each stage of the authentication to ensure that the asked for operation # has not changed. - uri = attr.ib(type=str) - method = attr.ib(type=str) + uri: str + method: str # A string description of the operation that the current authentication is # authorising. - description = attr.ib(type=str) + description: str class UIAuthWorkerStore(SQLBaseStore): diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 0f9b8575d3..f7c778bdf2 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -105,8 +105,10 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): GROUP BY room_id """ txn.execute(sql) - rooms = [{"room_id": x[0], "events": x[1]} for x in txn.fetchall()] - self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_rooms", rooms) + rooms = list(txn.fetchall()) + self.db_pool.simple_insert_many_txn( + txn, TEMP_TABLE + "_rooms", keys=("room_id", "events"), values=rooms + ) del rooms sql = ( @@ -117,9 +119,11 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): txn.execute(sql) txn.execute("SELECT name FROM users") - users = [{"user_id": x[0]} for x in txn.fetchall()] + users = list(txn.fetchall()) - self.db_pool.simple_insert_many_txn(txn, TEMP_TABLE + "_users", users) + self.db_pool.simple_insert_many_txn( + txn, TEMP_TABLE + "_users", keys=("user_id",), values=users + ) new_pos = await self.get_max_stream_id_in_current_state_deltas() await self.db_pool.runInteraction( diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py index eb1118d2cb..5de70f31d2 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py @@ -327,14 +327,15 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=( + "state_group", + "room_id", + "type", + "state_key", + "event_id", + ), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in delta_state.items() ], ) diff --git a/synapse/storage/databases/state/store.py b/synapse/storage/databases/state/store.py index c4c8c0021b..7614d76ac6 100644 --- a/synapse/storage/databases/state/store.py +++ b/synapse/storage/databases/state/store.py @@ -460,14 +460,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in delta_ids.items() ], ) @@ -475,14 +470,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": state_group, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (state_group, room_id, key[0], key[1], state_id) for key, state_id in current_state_ids.items() ], ) @@ -589,14 +579,9 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore): self.db_pool.simple_insert_many_txn( txn, table="state_groups_state", + keys=("state_group", "room_id", "type", "state_key", "event_id"), values=[ - { - "state_group": sg, - "room_id": room_id, - "type": key[0], - "state_key": key[1], - "event_id": state_id, - } + (sg, room_id, key[0], key[1], state_id) for key, state_id in curr_state.items() ], ) diff --git a/synapse/storage/keys.py b/synapse/storage/keys.py index 540adb8781..71584f3f74 100644 --- a/synapse/storage/keys.py +++ b/synapse/storage/keys.py @@ -21,7 +21,7 @@ from signedjson.types import VerifyKey logger = logging.getLogger(__name__) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class FetchKeyResult: - verify_key = attr.ib(type=VerifyKey) # the key itself - valid_until_ts = attr.ib(type=int) # how long we can use this key for + verify_key: VerifyKey # the key itself + valid_until_ts: int # how long we can use this key for diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index e45adfcb55..1823e18720 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -696,7 +696,7 @@ def _get_or_create_schema_state( ) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _DirectoryListing: """Helper class to store schema file name and the absolute path to it. @@ -705,5 +705,5 @@ class _DirectoryListing: `file_name` attr is kept first. """ - file_name = attr.ib(type=str) - absolute_path = attr.ib(type=str) + file_name: str + absolute_path: str diff --git a/synapse/storage/relations.py b/synapse/storage/relations.py index 10a46b5e82..b1536c1ca4 100644 --- a/synapse/storage/relations.py +++ b/synapse/storage/relations.py @@ -23,7 +23,7 @@ from synapse.types import JsonDict logger = logging.getLogger(__name__) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PaginationChunk: """Returned by relation pagination APIs. @@ -35,9 +35,9 @@ class PaginationChunk: None then there are no previous results. """ - chunk = attr.ib(type=List[JsonDict]) - next_batch = attr.ib(type=Optional[Any], default=None) - prev_batch = attr.ib(type=Optional[Any], default=None) + chunk: List[JsonDict] + next_batch: Optional[Any] = None + prev_batch: Optional[Any] = None def to_dict(self) -> Dict[str, Any]: d = {"chunk": self.chunk} @@ -51,7 +51,7 @@ class PaginationChunk: return d -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class RelationPaginationToken: """Pagination token for relation pagination API. @@ -64,8 +64,8 @@ class RelationPaginationToken: stream: The stream ordering of the boundary event. """ - topological = attr.ib(type=int) - stream = attr.ib(type=int) + topological: int + stream: int @staticmethod def from_string(string: str) -> "RelationPaginationToken": @@ -82,7 +82,7 @@ class RelationPaginationToken: return attr.astuple(self) -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class AggregationPaginationToken: """Pagination token for relation aggregation pagination API. @@ -94,8 +94,8 @@ class AggregationPaginationToken: stream: The MAX stream ordering in the boundary group. """ - count = attr.ib(type=int) - stream = attr.ib(type=int) + count: int + stream: int @staticmethod def from_string(string: str) -> "AggregationPaginationToken": diff --git a/synapse/storage/state.py b/synapse/storage/state.py index b5ba1560d1..df8b2f1088 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py @@ -45,7 +45,7 @@ logger = logging.getLogger(__name__) T = TypeVar("T") -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class StateFilter: """A filter used when querying for state. @@ -58,8 +58,8 @@ class StateFilter: appear in `types`. """ - types = attr.ib(type="frozendict[str, Optional[FrozenSet[str]]]") - include_others = attr.ib(default=False, type=bool) + types: "frozendict[str, Optional[FrozenSet[str]]]" + include_others: bool = False def __attrs_post_init__(self): # If `include_others` is set we canonicalise the filter by removing diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index b8112e1c05..3c13859faa 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -762,13 +762,13 @@ class _AsyncCtxManagerWrapper(Generic[T]): return self.inner.__exit__(exc_type, exc, tb) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _MultiWriterCtxManager: """Async context manager returned by MultiWriterIdGenerator""" - id_gen = attr.ib(type=MultiWriterIdGenerator) - multiple_ids = attr.ib(type=Optional[int], default=None) - stream_ids = attr.ib(type=List[int], factory=list) + id_gen: MultiWriterIdGenerator + multiple_ids: Optional[int] = None + stream_ids: List[int] = attr.Factory(list) async def __aenter__(self) -> Union[int, List[int]]: # It's safe to run this in autocommit mode as fetching values from a diff --git a/synapse/streams/config.py b/synapse/streams/config.py index c08d591f29..b52723e2b8 100644 --- a/synapse/streams/config.py +++ b/synapse/streams/config.py @@ -28,14 +28,14 @@ logger = logging.getLogger(__name__) MAX_LIMIT = 1000 -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class PaginationConfig: """A configuration object which stores pagination parameters.""" - from_token = attr.ib(type=Optional[StreamToken]) - to_token = attr.ib(type=Optional[StreamToken]) - direction = attr.ib(type=str) - limit = attr.ib(type=Optional[int]) + from_token: Optional[StreamToken] + to_token: Optional[StreamToken] + direction: str + limit: Optional[int] @classmethod async def from_request( diff --git a/synapse/types.py b/synapse/types.py index 42aeaf6270..f89fb216a6 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -20,7 +20,9 @@ from typing import ( Any, ClassVar, Dict, + List, Mapping, + Match, MutableMapping, Optional, Tuple, @@ -79,7 +81,7 @@ class ISynapseReactor( """The interfaces necessary for Synapse to function.""" -@attr.s(frozen=True, slots=True) +@attr.s(frozen=True, slots=True, auto_attribs=True) class Requester: """ Represents the user making a request @@ -97,13 +99,13 @@ class Requester: "puppeting" the user. """ - user = attr.ib(type="UserID") - access_token_id = attr.ib(type=Optional[int]) - is_guest = attr.ib(type=bool) - shadow_banned = attr.ib(type=bool) - device_id = attr.ib(type=Optional[str]) - app_service = attr.ib(type=Optional["ApplicationService"]) - authenticated_entity = attr.ib(type=str) + user: "UserID" + access_token_id: Optional[int] + is_guest: bool + shadow_banned: bool + device_id: Optional[str] + app_service: Optional["ApplicationService"] + authenticated_entity: str def serialize(self): """Converts self to a type that can be serialized as JSON, and then @@ -210,7 +212,7 @@ def get_localpart_from_id(string: str) -> str: DS = TypeVar("DS", bound="DomainSpecificString") -@attr.s(slots=True, frozen=True, repr=False) +@attr.s(slots=True, frozen=True, repr=False, auto_attribs=True) class DomainSpecificString(metaclass=abc.ABCMeta): """Common base class among ID/name strings that have a local part and a domain name, prefixed with a sigil. @@ -223,8 +225,8 @@ class DomainSpecificString(metaclass=abc.ABCMeta): SIGIL: ClassVar[str] = abc.abstractproperty() # type: ignore - localpart = attr.ib(type=str) - domain = attr.ib(type=str) + localpart: str + domain: str # Because this is a frozen class, it is deeply immutable. def __copy__(self): @@ -380,7 +382,7 @@ def map_username_to_mxid_localpart( onto different mxids Returns: - unicode: string suitable for a mxid localpart + string suitable for a mxid localpart """ if not isinstance(username, bytes): username = username.encode("utf-8") @@ -388,29 +390,23 @@ def map_username_to_mxid_localpart( # first we sort out upper-case characters if case_sensitive: - def f1(m): + def f1(m: Match[bytes]) -> bytes: return b"_" + m.group().lower() username = UPPER_CASE_PATTERN.sub(f1, username) else: username = username.lower() - # then we sort out non-ascii characters - def f2(m): - g = m.group()[0] - if isinstance(g, str): - # on python 2, we need to do a ord(). On python 3, the - # byte itself will do. - g = ord(g) - return b"=%02x" % (g,) + # then we sort out non-ascii characters by converting to the hex equivalent. + def f2(m: Match[bytes]) -> bytes: + return b"=%02x" % (m.group()[0],) username = NON_MXID_CHARACTER_PATTERN.sub(f2, username) # we also do the =-escaping to mxids starting with an underscore. username = re.sub(b"^_", b"=5f", username) - # we should now only have ascii bytes left, so can decode back to a - # unicode. + # we should now only have ascii bytes left, so can decode back to a string. return username.decode("ascii") @@ -466,14 +462,12 @@ class RoomStreamToken: attributes, must be hashable. """ - topological = attr.ib( - type=Optional[int], + topological: Optional[int] = attr.ib( validator=attr.validators.optional(attr.validators.instance_of(int)), ) - stream = attr.ib(type=int, validator=attr.validators.instance_of(int)) + stream: int = attr.ib(validator=attr.validators.instance_of(int)) - instance_map = attr.ib( - type="frozendict[str, int]", + instance_map: "frozendict[str, int]" = attr.ib( factory=frozendict, validator=attr.validators.deep_mapping( key_validator=attr.validators.instance_of(str), @@ -482,7 +476,7 @@ class RoomStreamToken: ), ) - def __attrs_post_init__(self): + def __attrs_post_init__(self) -> None: """Validates that both `topological` and `instance_map` aren't set.""" if self.instance_map and self.topological: @@ -598,7 +592,7 @@ class RoomStreamToken: return "s%d" % (self.stream,) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class StreamToken: """A collection of positions within multiple streams. @@ -606,20 +600,20 @@ class StreamToken: must be hashable. """ - room_key = attr.ib( - type=RoomStreamToken, validator=attr.validators.instance_of(RoomStreamToken) + room_key: RoomStreamToken = attr.ib( + validator=attr.validators.instance_of(RoomStreamToken) ) - presence_key = attr.ib(type=int) - typing_key = attr.ib(type=int) - receipt_key = attr.ib(type=int) - account_data_key = attr.ib(type=int) - push_rules_key = attr.ib(type=int) - to_device_key = attr.ib(type=int) - device_list_key = attr.ib(type=int) - groups_key = attr.ib(type=int) + presence_key: int + typing_key: int + receipt_key: int + account_data_key: int + push_rules_key: int + to_device_key: int + device_list_key: int + groups_key: int _SEPARATOR = "_" - START: "StreamToken" + START: ClassVar["StreamToken"] @classmethod async def from_string(cls, store: "DataStore", string: str) -> "StreamToken": @@ -679,7 +673,7 @@ class StreamToken: StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0) -@attr.s(slots=True, frozen=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class PersistedEventPosition: """Position of a newly persisted event with instance that persisted it. @@ -687,8 +681,8 @@ class PersistedEventPosition: RoomStreamToken. """ - instance_name = attr.ib(type=str) - stream = attr.ib(type=int) + instance_name: str + stream: int def persisted_after(self, token: RoomStreamToken) -> bool: return token.get_stream_pos_for_instance(self.instance_name) < self.stream @@ -738,15 +732,15 @@ class ThirdPartyInstanceID: __str__ = to_string -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class ReadReceipt: """Information about a read-receipt""" - room_id = attr.ib() - receipt_type = attr.ib() - user_id = attr.ib() - event_ids = attr.ib() - data = attr.ib() + room_id: str + receipt_type: str + user_id: str + event_ids: List[str] + data: JsonDict def get_verify_key_from_cross_signing_key(key_info): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 150a04b53e..3f7299aff7 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -309,12 +309,12 @@ def gather_results( # type: ignore[misc] return deferred.addCallback(tuple) -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class _LinearizerEntry: # The number of things executing. - count = attr.ib(type=int) + count: int # Deferreds for the things blocked from executing. - deferreds = attr.ib(type=collections.OrderedDict) + deferreds: collections.OrderedDict class Linearizer: diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 485ddb1893..d267703df0 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -33,7 +33,7 @@ DV = TypeVar("DV") # This class can't be generic because it uses slots with attrs. # See: https://github.com/python-attrs/attrs/issues/313 -@attr.s(slots=True) +@attr.s(slots=True, auto_attribs=True) class DictionaryEntry: # should be: Generic[DKT, DV]. """Returned when getting an entry from the cache @@ -41,14 +41,13 @@ class DictionaryEntry: # should be: Generic[DKT, DV]. full: Whether the cache has the full or dict or just some keys. If not full then not all requested keys will necessarily be present in `value` - known_absent: Keys that were looked up in the dict and were not - there. + known_absent: Keys that were looked up in the dict and were not there. value: The full or partial dict value """ - full = attr.ib(type=bool) - known_absent = attr.ib(type=Set[Any]) # should be: Set[DKT] - value = attr.ib(type=Dict[Any, Any]) # should be: Dict[DKT, DV] + full: bool + known_absent: Set[Any] # should be: Set[DKT] + value: Dict[Any, Any] # should be: Dict[DKT, DV] def __len__(self) -> int: return len(self.value) diff --git a/tests/api/test_auth.py b/tests/api/test_auth.py index a2dfa1ed05..4b53b6d40b 100644 --- a/tests/api/test_auth.py +++ b/tests/api/test_auth.py @@ -274,6 +274,39 @@ class AuthTestCase(unittest.HomeserverTestCase): self.assertEquals(failure.value.code, 400) self.assertEquals(failure.value.errcode, Codes.EXCLUSIVE) + def test_get_user_by_req__puppeted_token__not_tracking_puppeted_mau(self): + self.store.get_user_by_access_token = simple_async_mock( + TokenLookupResult( + user_id="@baldrick:matrix.org", + device_id="device", + token_owner="@admin:matrix.org", + ) + ) + self.store.insert_client_ip = simple_async_mock(None) + request = Mock(args={}) + request.getClientIP.return_value = "127.0.0.1" + request.args[b"access_token"] = [self.test_token] + request.requestHeaders.getRawHeaders = mock_getRawHeaders() + self.get_success(self.auth.get_user_by_req(request)) + self.store.insert_client_ip.assert_called_once() + + def test_get_user_by_req__puppeted_token__tracking_puppeted_mau(self): + self.auth._track_puppeted_user_ips = True + self.store.get_user_by_access_token = simple_async_mock( + TokenLookupResult( + user_id="@baldrick:matrix.org", + device_id="device", + token_owner="@admin:matrix.org", + ) + ) + self.store.insert_client_ip = simple_async_mock(None) + request = Mock(args={}) + request.getClientIP.return_value = "127.0.0.1" + request.args[b"access_token"] = [self.test_token] + request.requestHeaders.getRawHeaders = mock_getRawHeaders() + self.get_success(self.auth.get_user_by_req(request)) + self.assertEquals(self.store.insert_client_ip.call_count, 2) + def test_get_user_from_macaroon(self): self.store.get_user_by_access_token = simple_async_mock( TokenLookupResult(user_id="@baldrick:matrix.org", device_id="device") diff --git a/tests/handlers/test_e2e_keys.py b/tests/handlers/test_e2e_keys.py index ddcf3ee348..734ed84d78 100644 --- a/tests/handlers/test_e2e_keys.py +++ b/tests/handlers/test_e2e_keys.py @@ -13,8 +13,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +from typing import Iterable from unittest import mock +from parameterized import parameterized from signedjson import key as key, sign as sign from twisted.internet import defer @@ -23,6 +25,7 @@ from synapse.api.constants import RoomEncryptionAlgorithms from synapse.api.errors import Codes, SynapseError from tests import unittest +from tests.test_utils import make_awaitable class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): @@ -765,6 +768,8 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): remote_user_id = "@test:other" local_user_id = "@test:test" + # Pretend we're sharing a room with the user we're querying. If not, + # `_query_devices_for_destination` will return early. self.store.get_rooms_for_user = mock.Mock( return_value=defer.succeed({"some_room_id"}) ) @@ -831,3 +836,94 @@ class E2eKeysHandlerTestCase(unittest.HomeserverTestCase): } }, ) + + @parameterized.expand( + [ + # The remote homeserver's response indicates that this user has 0/1/2 devices. + ([],), + (["device_1"],), + (["device_1", "device_2"],), + ] + ) + def test_query_all_devices_caches_result(self, device_ids: Iterable[str]): + """Test that requests for all of a remote user's devices are cached. + + We do this by asserting that only one call over federation was made, and that + the two queries to the local homeserver produce the same response. + """ + local_user_id = "@test:test" + remote_user_id = "@test:other" + request_body = {"device_keys": {remote_user_id: []}} + + response_devices = [ + { + "device_id": device_id, + "keys": { + "algorithms": ["dummy"], + "device_id": device_id, + "keys": {f"dummy:{device_id}": "dummy"}, + "signatures": {device_id: {f"dummy:{device_id}": "dummy"}}, + "unsigned": {}, + "user_id": "@test:other", + }, + } + for device_id in device_ids + ] + + response_body = { + "devices": response_devices, + "user_id": remote_user_id, + "stream_id": 12345, # an integer, according to the spec + } + + e2e_handler = self.hs.get_e2e_keys_handler() + + # Pretend we're sharing a room with the user we're querying. If not, + # `_query_devices_for_destination` will return early. + mock_get_rooms = mock.patch.object( + self.store, + "get_rooms_for_user", + new_callable=mock.MagicMock, + return_value=make_awaitable(["some_room_id"]), + ) + mock_request = mock.patch.object( + self.hs.get_federation_client(), + "query_user_devices", + new_callable=mock.MagicMock, + return_value=make_awaitable(response_body), + ) + + with mock_get_rooms, mock_request as mocked_federation_request: + # Make the first query and sanity check it succeeds. + response_1 = self.get_success( + e2e_handler.query_devices( + request_body, + timeout=10, + from_user_id=local_user_id, + from_device_id="some_device_id", + ) + ) + self.assertEqual(response_1["failures"], {}) + + # We should have made a federation request to do so. + mocked_federation_request.assert_called_once() + + # Reset the mock so we can prove we don't make a second federation request. + mocked_federation_request.reset_mock() + + # Repeat the query. + response_2 = self.get_success( + e2e_handler.query_devices( + request_body, + timeout=10, + from_user_id=local_user_id, + from_device_id="some_device_id", + ) + ) + self.assertEqual(response_2["failures"], {}) + + # We should not have made a second federation request. + mocked_federation_request.assert_not_called() + + # The two requests to the local homeserver should be identical. + self.assertEqual(response_1, response_2) diff --git a/tests/handlers/test_room_summary.py b/tests/handlers/test_room_summary.py index e5a6a6c747..ce3ebcf2f2 100644 --- a/tests/handlers/test_room_summary.py +++ b/tests/handlers/test_room_summary.py @@ -253,6 +253,38 @@ class SpaceSummaryTestCase(unittest.HomeserverTestCase): ) self._assert_hierarchy(result, expected) + def test_large_space(self): + """Test a space with a large number of rooms.""" + rooms = [self.room] + # Make at least 51 rooms that are part of the space. + for _ in range(55): + room = self.helper.create_room_as(self.user, tok=self.token) + self._add_child(self.space, room, self.token) + rooms.append(room) + + result = self.get_success(self.handler.get_space_summary(self.user, self.space)) + # The spaces result should have the space and the first 50 rooms in it, + # along with the links from space -> room for those 50 rooms. + expected = [(self.space, rooms[:50])] + [(room, []) for room in rooms[:49]] + self._assert_rooms(result, expected) + + # The result should have the space and the rooms in it, along with the links + # from space -> room. + expected = [(self.space, rooms)] + [(room, []) for room in rooms] + + # Make two requests to fully paginate the results. + result = self.get_success( + self.handler.get_room_hierarchy(create_requester(self.user), self.space) + ) + result2 = self.get_success( + self.handler.get_room_hierarchy( + create_requester(self.user), self.space, from_token=result["next_batch"] + ) + ) + # Combine the results. + result["rooms"] += result2["rooms"] + self._assert_hierarchy(result, expected) + def test_visibility(self): """A user not in a space cannot inspect it.""" user2 = self.register_user("user2", "pass") diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py index 638186f173..07a760e91a 100644 --- a/tests/handlers/test_sync.py +++ b/tests/handlers/test_sync.py @@ -11,15 +11,14 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from typing import Optional -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock, patch from synapse.api.constants import EventTypes, JoinRules from synapse.api.errors import Codes, ResourceLimitError from synapse.api.filtering import Filtering from synapse.api.room_versions import RoomVersions -from synapse.handlers.sync import SyncConfig +from synapse.handlers.sync import SyncConfig, SyncResult from synapse.rest import admin from synapse.rest.client import knock, login, room from synapse.server import HomeServer @@ -27,6 +26,7 @@ from synapse.types import UserID, create_requester import tests.unittest import tests.utils +from tests.test_utils import make_awaitable class SyncTestCase(tests.unittest.HomeserverTestCase): @@ -186,6 +186,97 @@ class SyncTestCase(tests.unittest.HomeserverTestCase): self.assertNotIn(invite_room, [r.room_id for r in result.invited]) self.assertNotIn(knock_room, [r.room_id for r in result.knocked]) + def test_ban_wins_race_with_join(self): + """Rooms shouldn't appear under "joined" if a join loses a race to a ban. + + A complicated edge case. Imagine the following scenario: + + * you attempt to join a room + * racing with that is a ban which comes in over federation, which ends up with + an earlier stream_ordering than the join. + * you get a sync response with a sync token which is _after_ the ban, but before + the join + * now your join lands; it is a valid event because its `prev_event`s predate the + ban, but will not make it into current_state_events (because bans win over + joins in state res, essentially). + * When we do a sync from the incremental sync, the only event in the timeline + is your join ... and yet you aren't joined. + + The ban coming in over federation isn't crucial for this behaviour; the key + requirements are: + 1. the homeserver generates a join event with prev_events that precede the ban + (so that it passes the "are you banned" test) + 2. the join event has a stream_ordering after that of the ban. + + We use monkeypatching to artificially trigger condition (1). + """ + # A local user Alice creates a room. + owner = self.register_user("alice", "password") + owner_tok = self.login(owner, "password") + room_id = self.helper.create_room_as(owner, is_public=True, tok=owner_tok) + + # Do a sync as Alice to get the latest event in the room. + alice_sync_result: SyncResult = self.get_success( + self.sync_handler.wait_for_sync_for_user( + create_requester(owner), generate_sync_config(owner) + ) + ) + self.assertEqual(len(alice_sync_result.joined), 1) + self.assertEqual(alice_sync_result.joined[0].room_id, room_id) + last_room_creation_event_id = ( + alice_sync_result.joined[0].timeline.events[-1].event_id + ) + + # Eve, a ne'er-do-well, registers. + eve = self.register_user("eve", "password") + eve_token = self.login(eve, "password") + + # Alice preemptively bans Eve. + self.helper.ban(room_id, owner, eve, tok=owner_tok) + + # Eve syncs. + eve_requester = create_requester(eve) + eve_sync_config = generate_sync_config(eve) + eve_sync_after_ban: SyncResult = self.get_success( + self.sync_handler.wait_for_sync_for_user(eve_requester, eve_sync_config) + ) + + # Sanity check this sync result. We shouldn't be joined to the room. + self.assertEqual(eve_sync_after_ban.joined, []) + + # Eve tries to join the room. We monkey patch the internal logic which selects + # the prev_events used when creating the join event, such that the ban does not + # precede the join. + mocked_get_prev_events = patch.object( + self.hs.get_datastore(), + "get_prev_events_for_room", + new_callable=MagicMock, + return_value=make_awaitable([last_room_creation_event_id]), + ) + with mocked_get_prev_events: + self.helper.join(room_id, eve, tok=eve_token) + + # Eve makes a second, incremental sync. + eve_incremental_sync_after_join: SyncResult = self.get_success( + self.sync_handler.wait_for_sync_for_user( + eve_requester, + eve_sync_config, + since_token=eve_sync_after_ban.next_batch, + ) + ) + # Eve should not see herself as joined to the room. + self.assertEqual(eve_incremental_sync_after_join.joined, []) + + # If we did a third initial sync, we should _still_ see eve is not joined to the room. + eve_initial_sync_after_join: SyncResult = self.get_success( + self.sync_handler.wait_for_sync_for_user( + eve_requester, + eve_sync_config, + since_token=None, + ) + ) + self.assertEqual(eve_initial_sync_after_join.joined, []) + _request_key = 0 diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py index 742f194257..b70350b6f1 100644 --- a/tests/rest/admin/test_federation.py +++ b/tests/rest/admin/test_federation.py @@ -314,15 +314,12 @@ class FederationTestCase(unittest.HomeserverTestCase): retry_interval, last_successful_stream_ordering, ) in dest: - self.get_success( - self.store.set_destination_retry_timings( - destination, failure_ts, retry_last_ts, retry_interval - ) - ) - self.get_success( - self.store.set_destination_last_successful_stream_ordering( - destination, last_successful_stream_ordering - ) + self._create_destination( + destination, + failure_ts, + retry_last_ts, + retry_interval, + last_successful_stream_ordering, ) # order by default (destination) @@ -413,11 +410,9 @@ class FederationTestCase(unittest.HomeserverTestCase): _search_test(None, "foo") _search_test(None, "bar") - def test_get_single_destination(self) -> None: - """ - Get one specific destinations. - """ - self._create_destinations(5) + def test_get_single_destination_with_retry_timings(self) -> None: + """Get one specific destination which has retry timings.""" + self._create_destinations(1) channel = self.make_request( "GET", @@ -432,6 +427,53 @@ class FederationTestCase(unittest.HomeserverTestCase): # convert channel.json_body into a List self._check_fields([channel.json_body]) + def test_get_single_destination_no_retry_timings(self) -> None: + """Get one specific destination which has no retry timings.""" + self._create_destination("sub0.example.com") + + channel = self.make_request( + "GET", + self.url + "/sub0.example.com", + access_token=self.admin_user_tok, + ) + + self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) + self.assertEqual("sub0.example.com", channel.json_body["destination"]) + self.assertEqual(0, channel.json_body["retry_last_ts"]) + self.assertEqual(0, channel.json_body["retry_interval"]) + self.assertIsNone(channel.json_body["failure_ts"]) + self.assertIsNone(channel.json_body["last_successful_stream_ordering"]) + + def _create_destination( + self, + destination: str, + failure_ts: Optional[int] = None, + retry_last_ts: int = 0, + retry_interval: int = 0, + last_successful_stream_ordering: Optional[int] = None, + ) -> None: + """Create one specific destination + + Args: + destination: the destination we have successfully sent to + failure_ts: when the server started failing (ms since epoch) + retry_last_ts: time of last retry attempt in unix epoch ms + retry_interval: how long until next retry in ms + last_successful_stream_ordering: the stream_ordering of the most + recent successfully-sent PDU + """ + self.get_success( + self.store.set_destination_retry_timings( + destination, failure_ts, retry_last_ts, retry_interval + ) + ) + if last_successful_stream_ordering is not None: + self.get_success( + self.store.set_destination_last_successful_stream_ordering( + destination, last_successful_stream_ordering + ) + ) + def _create_destinations(self, number_destinations: int) -> None: """Create a number of destinations @@ -440,10 +482,7 @@ class FederationTestCase(unittest.HomeserverTestCase): """ for i in range(0, number_destinations): dest = f"sub{i}.example.com" - self.get_success(self.store.set_destination_retry_timings(dest, 50, 50, 50)) - self.get_success( - self.store.set_destination_last_successful_stream_ordering(dest, 100) - ) + self._create_destination(dest, 50, 50, 50, 100) def _check_fields(self, content: List[JsonDict]) -> None: """Checks that the expected destination attributes are present in content diff --git a/tests/rest/admin/test_registration_tokens.py b/tests/rest/admin/test_registration_tokens.py index 81f3ac7f04..8513b1d2df 100644 --- a/tests/rest/admin/test_registration_tokens.py +++ b/tests/rest/admin/test_registration_tokens.py @@ -223,20 +223,13 @@ class ManageRegistrationTokensTestCase(unittest.HomeserverTestCase): # Create all possible single character tokens tokens = [] for c in string.ascii_letters + string.digits + "._~-": - tokens.append( - { - "token": c, - "uses_allowed": None, - "pending": 0, - "completed": 0, - "expiry_time": None, - } - ) + tokens.append((c, None, 0, 0, None)) self.get_success( self.store.db_pool.simple_insert_many( "registration_tokens", - tokens, - "create_all_registration_tokens", + keys=("token", "uses_allowed", "pending", "completed", "expiry_time"), + values=tokens, + desc="create_all_registration_tokens", ) ) diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index e0b9fe8e91..9711405735 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -1181,6 +1181,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.other_user, device_id=None, valid_until_ms=None ) ) + self.url_prefix = "/_synapse/admin/v2/users/%s" self.url_other_user = self.url_prefix % self.other_user @@ -1188,7 +1189,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): """ If the user is not a server admin, an error is returned. """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" channel = self.make_request( "GET", @@ -1216,7 +1217,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): channel = self.make_request( "GET", - "/_synapse/admin/v2/users/@unknown_person:test", + self.url_prefix % "@unknown_person:test", access_token=self.admin_user_tok, ) @@ -1337,7 +1338,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): """ Check that a new admin user is created successfully. """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user (server admin) body = { @@ -1386,7 +1387,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): """ Check that a new regular user is created successfully. """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user body = { @@ -1478,7 +1479,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): ) # Register new user with admin API - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user channel = self.make_request( @@ -1515,7 +1516,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): ) # Register new user with admin API - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user channel = self.make_request( @@ -1545,7 +1546,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): Check that a new regular user is created successfully and got an email pusher. """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user body = { @@ -1588,7 +1589,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): Check that a new regular user is created successfully and got not an email pusher. """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user body = { @@ -2085,10 +2086,13 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertTrue(channel.json_body["deactivated"]) - self.assertIsNone(channel.json_body["password_hash"]) self.assertEqual(0, len(channel.json_body["threepids"])) self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"]) self.assertEqual("User", channel.json_body["displayname"]) + + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + # the user is deactivated, the threepid will be deleted # Get user @@ -2101,11 +2105,13 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertTrue(channel.json_body["deactivated"]) - self.assertIsNone(channel.json_body["password_hash"]) self.assertEqual(0, len(channel.json_body["threepids"])) self.assertEqual("mxc://servername/mediaid", channel.json_body["avatar_url"]) self.assertEqual("User", channel.json_body["displayname"]) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + @override_config({"user_directory": {"enabled": True, "search_all_users": True}}) def test_change_name_deactivate_user_user_directory(self): """ @@ -2177,9 +2183,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertFalse(channel.json_body["deactivated"]) - self.assertIsNotNone(channel.json_body["password_hash"]) self._is_erased("@user:test", False) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + @override_config({"password_config": {"localdb_enabled": False}}) def test_reactivate_user_localdb_disabled(self): """ @@ -2209,9 +2217,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertFalse(channel.json_body["deactivated"]) - self.assertIsNone(channel.json_body["password_hash"]) self._is_erased("@user:test", False) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + @override_config({"password_config": {"enabled": False}}) def test_reactivate_user_password_disabled(self): """ @@ -2241,9 +2251,11 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertEqual("@user:test", channel.json_body["name"]) self.assertFalse(channel.json_body["deactivated"]) - self.assertIsNone(channel.json_body["password_hash"]) self._is_erased("@user:test", False) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + def test_set_user_as_admin(self): """ Test setting the admin flag on a user. @@ -2328,7 +2340,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): Ensure an account can't accidentally be deactivated by using a str value for the deactivated body parameter """ - url = "/_synapse/admin/v2/users/@bob:test" + url = self.url_prefix % "@bob:test" # Create user channel = self.make_request( @@ -2392,18 +2404,20 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Deactivate the user. channel = self.make_request( "PUT", - "/_synapse/admin/v2/users/%s" % urllib.parse.quote(user_id), + self.url_prefix % urllib.parse.quote(user_id), access_token=self.admin_user_tok, content={"deactivated": True}, ) self.assertEqual(HTTPStatus.OK, channel.code, msg=channel.json_body) self.assertTrue(channel.json_body["deactivated"]) - self.assertIsNone(channel.json_body["password_hash"]) self._is_erased(user_id, False) d = self.store.mark_user_erased(user_id) self.assertIsNone(self.get_success(d)) self._is_erased(user_id, True) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", channel.json_body) + def _check_fields(self, content: JsonDict): """Checks that the expected user attributes are present in content @@ -2416,13 +2430,15 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertIn("admin", content) self.assertIn("deactivated", content) self.assertIn("shadow_banned", content) - self.assertIn("password_hash", content) self.assertIn("creation_ts", content) self.assertIn("appservice_id", content) self.assertIn("consent_server_notice_sent", content) self.assertIn("consent_version", content) self.assertIn("external_ids", content) + # This key was removed intentionally. Ensure it is not accidentally re-included. + self.assertNotIn("password_hash", content) + class UserMembershipRestTestCase(unittest.HomeserverTestCase): diff --git a/tests/rest/client/test_relations.py b/tests/rest/client/test_relations.py index c026d526ef..ee26751430 100644 --- a/tests/rest/client/test_relations.py +++ b/tests/rest/client/test_relations.py @@ -93,11 +93,6 @@ class RelationsTestCase(unittest.HomeserverTestCase): channel.json_body, ) - def test_deny_membership(self): - """Test that we deny relations on membership events""" - channel = self._send_relation(RelationTypes.ANNOTATION, EventTypes.Member) - self.assertEquals(400, channel.code, channel.json_body) - def test_deny_invalid_event(self): """Test that we deny relations on non-existant events""" channel = self._send_relation( @@ -577,11 +572,11 @@ class RelationsTestCase(unittest.HomeserverTestCase): assert_bundle(channel.json_body["event"]["unsigned"].get("m.relations")) # Request sync. - # channel = self.make_request("GET", "/sync", access_token=self.user_token) - # self.assertEquals(200, channel.code, channel.json_body) - # room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"] - # self.assertTrue(room_timeline["limited"]) - # _find_and_assert_event(room_timeline["events"]) + channel = self.make_request("GET", "/sync", access_token=self.user_token) + self.assertEquals(200, channel.code, channel.json_body) + room_timeline = channel.json_body["rooms"]["join"][self.room]["timeline"] + self.assertTrue(room_timeline["limited"]) + _find_and_assert_event(room_timeline["events"]) # Note that /relations is tested separately in test_aggregation_get_event_for_thread # since it needs different data configured. @@ -1119,7 +1114,8 @@ class RelationsTestCase(unittest.HomeserverTestCase): relation_type: One of `RelationTypes` event_type: The type of the event to create key: The aggregation key used for m.annotation relation type. - content: The content of the created event. + content: The content of the created event. Will be modified to configure + the m.relates_to key based on the other provided parameters. access_token: The access token used to send the relation, defaults to `self.user_token` parent_id: The event_id this relation relates to. If None, then self.parent_id @@ -1130,17 +1126,21 @@ class RelationsTestCase(unittest.HomeserverTestCase): if not access_token: access_token = self.user_token - query = "" - if key: - query = "?key=" + urllib.parse.quote_plus(key.encode("utf-8")) - original_id = parent_id if parent_id else self.parent_id + if content is None: + content = {} + content["m.relates_to"] = { + "event_id": original_id, + "rel_type": relation_type, + } + if key is not None: + content["m.relates_to"]["key"] = key + channel = self.make_request( "POST", - "/_matrix/client/unstable/rooms/%s/send_relation/%s/%s/%s%s" - % (self.room, original_id, relation_type, event_type, query), - content or {}, + f"/_matrix/client/v3/rooms/{self.room}/send/{event_type}", + content, access_token=access_token, ) return channel diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index b58452195a..fe5b536d97 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -228,7 +228,7 @@ class RetentionTestCase(unittest.HomeserverTestCase): self.assertIsNotNone(event) time_now = self.clock.time_msec() - serialized = self.get_success(self.serializer.serialize_event(event, time_now)) + serialized = self.serializer.serialize_event(event, time_now) return serialized diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 1af5e5cee5..8424383580 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -196,6 +196,16 @@ class RestHelper: expect_code=expect_code, ) + def ban(self, room: str, src: str, targ: str, **kwargs: object): + """A convenience helper: `change_membership` with `membership` preset to "ban".""" + self.change_membership( + room=room, + src=src, + targ=targ, + membership=Membership.BAN, + **kwargs, + ) + def change_membership( self, room: str, diff --git a/tests/server.py b/tests/server.py index ca2b7a5b97..a0cd14ea45 100644 --- a/tests/server.py +++ b/tests/server.py @@ -14,6 +14,8 @@ import hashlib import json import logging +import os +import os.path import time import uuid import warnings @@ -71,6 +73,7 @@ from tests.utils import ( POSTGRES_HOST, POSTGRES_PASSWORD, POSTGRES_USER, + SQLITE_PERSIST_DB, USE_POSTGRES_FOR_TESTS, MockClock, default_config, @@ -739,9 +742,23 @@ def setup_test_homeserver( }, } else: + if SQLITE_PERSIST_DB: + # The current working directory is in _trial_temp, so this gets created within that directory. + test_db_location = os.path.abspath("test.db") + logger.debug("Will persist db to %s", test_db_location) + # Ensure each test gets a clean database. + try: + os.remove(test_db_location) + except FileNotFoundError: + pass + else: + logger.debug("Removed existing DB at %s", test_db_location) + else: + test_db_location = ":memory:" + database_config = { "name": "sqlite3", - "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, + "args": {"database": test_db_location, "cp_min": 1, "cp_max": 1}, } if "db_txn_limit" in kwargs: diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index ecfda7677e..632bbc9de7 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -515,17 +515,23 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): self.get_success( self.store.db_pool.simple_insert_many( table="federation_inbound_events_staging", + keys=( + "origin", + "room_id", + "received_ts", + "event_id", + "event_json", + "internal_metadata", + ), values=[ - { - "origin": "some_origin", - "room_id": room_id, - "received_ts": 0, - "event_id": f"$fake_event_id_{i + 1}", - "event_json": json_encoder.encode( - {"prev_events": [f"$fake_event_id_{i}"]} - ), - "internal_metadata": "{}", - } + ( + "some_origin", + room_id, + 0, + f"$fake_event_id_{i + 1}", + json_encoder.encode({"prev_events": [f"$fake_event_id_{i}"]}), + "{}", + ) for i in range(500) ], desc="test_prune_inbound_federation_queue", diff --git a/tests/test_federation.py b/tests/test_federation.py index 3eef1c4c05..2b9804aba0 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -17,7 +17,9 @@ from unittest.mock import Mock from twisted.internet.defer import succeed from synapse.api.errors import FederationError +from synapse.api.room_versions import RoomVersions from synapse.events import make_event_from_dict +from synapse.federation.federation_base import event_from_pdu_json from synapse.logging.context import LoggingContext from synapse.types import UserID, create_requester from synapse.util import Clock @@ -276,3 +278,73 @@ class MessageAcceptTests(unittest.HomeserverTestCase): "ed25519:" + remote_self_signing_key in self_signing_key["keys"].keys(), ) self.assertTrue(remote_self_signing_key in self_signing_key["keys"].values()) + + +class StripUnsignedFromEventsTestCase(unittest.TestCase): + def test_strip_unauthorized_unsigned_values(self): + event1 = { + "sender": "@baduser:test.serv", + "state_key": "@baduser:test.serv", + "event_id": "$event1:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.member", + "origin": "test.servx", + "content": {"membership": "join"}, + "auth_events": [], + "unsigned": {"malicious garbage": "hackz", "more warez": "more hackz"}, + } + filtered_event = event_from_pdu_json(event1, RoomVersions.V1) + # Make sure unauthorized fields are stripped from unsigned + self.assertNotIn("more warez", filtered_event.unsigned) + + def test_strip_event_maintains_allowed_fields(self): + event2 = { + "sender": "@baduser:test.serv", + "state_key": "@baduser:test.serv", + "event_id": "$event2:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.member", + "origin": "test.servx", + "auth_events": [], + "content": {"membership": "join"}, + "unsigned": { + "malicious garbage": "hackz", + "more warez": "more hackz", + "age": 14, + "invite_room_state": [], + }, + } + + filtered_event2 = event_from_pdu_json(event2, RoomVersions.V1) + self.assertIn("age", filtered_event2.unsigned) + self.assertEqual(14, filtered_event2.unsigned["age"]) + self.assertNotIn("more warez", filtered_event2.unsigned) + # Invite_room_state is allowed in events of type m.room.member + self.assertIn("invite_room_state", filtered_event2.unsigned) + self.assertEqual([], filtered_event2.unsigned["invite_room_state"]) + + def test_strip_event_removes_fields_based_on_event_type(self): + event3 = { + "sender": "@baduser:test.serv", + "state_key": "@baduser:test.serv", + "event_id": "$event3:test.serv", + "depth": 1000, + "origin_server_ts": 1, + "type": "m.room.power_levels", + "origin": "test.servx", + "content": {}, + "auth_events": [], + "unsigned": { + "malicious garbage": "hackz", + "more warez": "more hackz", + "age": 14, + "invite_room_state": [], + }, + } + filtered_event3 = event_from_pdu_json(event3, RoomVersions.V1) + self.assertIn("age", filtered_event3.unsigned) + # Invite_room_state field is only permitted in event type m.room.member + self.assertNotIn("invite_room_state", filtered_event3.unsigned) + self.assertNotIn("more warez", filtered_event3.unsigned) diff --git a/tests/test_utils/__init__.py b/tests/test_utils/__init__.py index 15ac2bfeba..f05a373aa0 100644 --- a/tests/test_utils/__init__.py +++ b/tests/test_utils/__init__.py @@ -19,7 +19,7 @@ import sys import warnings from asyncio import Future from binascii import unhexlify -from typing import Any, Awaitable, Callable, TypeVar +from typing import Awaitable, Callable, TypeVar from unittest.mock import Mock import attr @@ -46,7 +46,7 @@ def get_awaitable_result(awaitable: Awaitable[TV]) -> TV: raise Exception("awaitable has not yet completed") -def make_awaitable(result: Any) -> Awaitable[Any]: +def make_awaitable(result: TV) -> Awaitable[TV]: """ Makes an awaitable, suitable for mocking an `async` function. This uses Futures as they can be awaited multiple times so can be returned diff --git a/tests/utils.py b/tests/utils.py index 6d013e8518..c06fc320f3 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -42,6 +42,10 @@ POSTGRES_HOST = os.environ.get("SYNAPSE_POSTGRES_HOST", None) POSTGRES_PASSWORD = os.environ.get("SYNAPSE_POSTGRES_PASSWORD", None) POSTGRES_BASE_DB = "_synapse_unit_tests_base_%s" % (os.getpid(),) +# When debugging a specific test, it's occasionally useful to write the +# DB to disk and query it with the sqlite CLI. +SQLITE_PERSIST_DB = os.environ.get("SYNAPSE_TEST_PERSIST_SQLITE_DB") is not None + # the dbname we will connect to in order to create the base database. POSTGRES_DBNAME_FOR_INITIAL_CREATE = "postgres" |