diff --git a/CHANGES.md b/CHANGES.md
index 5de819ea1e..38906ade49 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,105 @@
+Synapse 1.21.0 (2020-10-01)
+===========================
+
+Features
+--------
+
+- Require the user to confirm that their password should be reset after clicking the email confirmation link. ([\#8004](https://github.com/matrix-org/synapse/issues/8004))
+- Add an admin API `GET /_synapse/admin/v1/event_reports` to read entries of table `event_reports`. Contributed by @dklimpel. ([\#8217](https://github.com/matrix-org/synapse/issues/8217))
+- Consolidate the SSO error template across all configuration. ([\#8248](https://github.com/matrix-org/synapse/issues/8248), [\#8405](https://github.com/matrix-org/synapse/issues/8405))
+- Add a configuration option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number. ([\#8275](https://github.com/matrix-org/synapse/issues/8275), [\#8417](https://github.com/matrix-org/synapse/issues/8417))
+- Add experimental support for sharding event persister. ([\#8294](https://github.com/matrix-org/synapse/issues/8294), [\#8387](https://github.com/matrix-org/synapse/issues/8387), [\#8396](https://github.com/matrix-org/synapse/issues/8396), [\#8419](https://github.com/matrix-org/synapse/issues/8419))
+- Add the room topic and avatar to the room details admin API. ([\#8305](https://github.com/matrix-org/synapse/issues/8305))
+- Add an admin API for querying rooms where a user is a member. Contributed by @dklimpel. ([\#8306](https://github.com/matrix-org/synapse/issues/8306))
+- Add `uk.half-shot.msc2778.login.application_service` login type to allow appservices to login. ([\#8320](https://github.com/matrix-org/synapse/issues/8320))
+- Add a configuration option that allows existing users to log in with OpenID Connect. Contributed by @BBBSnowball and @OmmyZhang. ([\#8345](https://github.com/matrix-org/synapse/issues/8345))
+- Add prometheus metrics for replication requests. ([\#8406](https://github.com/matrix-org/synapse/issues/8406))
+- Support passing additional single sign-on parameters to the client. ([\#8413](https://github.com/matrix-org/synapse/issues/8413))
+- Add experimental reporting of metrics on expensive rooms for state-resolution. ([\#8420](https://github.com/matrix-org/synapse/issues/8420))
+- Add experimental prometheus metric to track numbers of "large" rooms for state resolutiom. ([\#8425](https://github.com/matrix-org/synapse/issues/8425))
+- Add prometheus metrics to track federation delays. ([\#8430](https://github.com/matrix-org/synapse/issues/8430))
+
+
+Bugfixes
+--------
+
+- Fix a bug in the media repository where remote thumbnails with the same size but different crop methods would overwrite each other. Contributed by @deepbluev7. ([\#7124](https://github.com/matrix-org/synapse/issues/7124))
+- Fix inconsistent handling of non-existent push rules, and stop tracking the `enabled` state of removed push rules. ([\#7796](https://github.com/matrix-org/synapse/issues/7796))
+- Fix a longstanding bug when storing a media file with an empty `upload_name`. ([\#7905](https://github.com/matrix-org/synapse/issues/7905))
+- Fix messages not being sent over federation until an event is sent into the same room. ([\#8230](https://github.com/matrix-org/synapse/issues/8230), [\#8247](https://github.com/matrix-org/synapse/issues/8247), [\#8258](https://github.com/matrix-org/synapse/issues/8258), [\#8272](https://github.com/matrix-org/synapse/issues/8272), [\#8322](https://github.com/matrix-org/synapse/issues/8322))
+- Fix a longstanding bug where files that could not be thumbnailed would result in an Internal Server Error. ([\#8236](https://github.com/matrix-org/synapse/issues/8236))
+- Upgrade minimum version of `canonicaljson` to version 1.4.0, to fix an unicode encoding issue. ([\#8262](https://github.com/matrix-org/synapse/issues/8262))
+- Fix logstanding bug which could lead to incomplete database upgrades on SQLite. ([\#8265](https://github.com/matrix-org/synapse/issues/8265))
+- Fix stack overflow when stderr is redirected to the logging system, and the logging system encounters an error. ([\#8268](https://github.com/matrix-org/synapse/issues/8268))
+- Fix a bug which cause the logging system to report errors, if `DEBUG` was enabled and no `context` filter was applied. ([\#8278](https://github.com/matrix-org/synapse/issues/8278))
+- Fix edge case where push could get delayed for a user until a later event was pushed. ([\#8287](https://github.com/matrix-org/synapse/issues/8287))
+- Fix fetching malformed events from remote servers. ([\#8324](https://github.com/matrix-org/synapse/issues/8324))
+- Fix `UnboundLocalError` from occuring when appservices send a malformed register request. ([\#8329](https://github.com/matrix-org/synapse/issues/8329))
+- Don't send push notifications to expired user accounts. ([\#8353](https://github.com/matrix-org/synapse/issues/8353))
+- Fix a regression in v1.19.0 with reactivating users through the admin API. ([\#8362](https://github.com/matrix-org/synapse/issues/8362))
+- Fix a bug where during device registration the length of the device name wasn't limited. ([\#8364](https://github.com/matrix-org/synapse/issues/8364))
+- Include `guest_access` in the fields that are checked for null bytes when updating `room_stats_state`. Broke in v1.7.2. ([\#8373](https://github.com/matrix-org/synapse/issues/8373))
+- Fix theoretical race condition where events are not sent down `/sync` if the synchrotron worker is restarted without restarting other workers. ([\#8374](https://github.com/matrix-org/synapse/issues/8374))
+- Fix a bug which could cause errors in rooms with malformed membership events, on servers using sqlite. ([\#8385](https://github.com/matrix-org/synapse/issues/8385))
+- Fix a bug introduced in v1.20.0 which caused the `synapse_port_db` script to fail. ([\#8386](https://github.com/matrix-org/synapse/issues/8386))
+- Fix "Re-starting finished log context" warning when receiving an event we already had over federation. ([\#8398](https://github.com/matrix-org/synapse/issues/8398))
+- Fix incorrect handling of timeouts on outgoing HTTP requests. ([\#8400](https://github.com/matrix-org/synapse/issues/8400))
+- Fix a regression in v1.20.0 in the `synapse_port_db` script regarding the `ui_auth_sessions_ips` table. ([\#8410](https://github.com/matrix-org/synapse/issues/8410))
+- Remove unnecessary 3PID registration check when resetting password via an email address. Bug introduced in v0.34.0rc2. ([\#8414](https://github.com/matrix-org/synapse/issues/8414))
+
+
+Improved Documentation
+----------------------
+
+- Add `/_synapse/client` to the reverse proxy documentation. ([\#8227](https://github.com/matrix-org/synapse/issues/8227))
+- Add note to the reverse proxy settings documentation about disabling Apache's mod_security2. Contributed by Julian Fietkau (@jfietkau). ([\#8375](https://github.com/matrix-org/synapse/issues/8375))
+- Improve description of `server_name` config option in `homserver.yaml`. ([\#8415](https://github.com/matrix-org/synapse/issues/8415))
+
+
+Deprecations and Removals
+-------------------------
+
+- Drop support for `prometheus_client` older than 0.4.0. ([\#8426](https://github.com/matrix-org/synapse/issues/8426))
+
+
+Internal Changes
+----------------
+
+- Fix tests on distros which disable TLSv1.0. Contributed by @danc86. ([\#8208](https://github.com/matrix-org/synapse/issues/8208))
+- Simplify the distributor code to avoid unnecessary work. ([\#8216](https://github.com/matrix-org/synapse/issues/8216))
+- Remove the `populate_stats_process_rooms_2` background job and restore functionality to `populate_stats_process_rooms`. ([\#8243](https://github.com/matrix-org/synapse/issues/8243))
+- Clean up type hints for `PaginationConfig`. ([\#8250](https://github.com/matrix-org/synapse/issues/8250), [\#8282](https://github.com/matrix-org/synapse/issues/8282))
+- Track the latest event for every destination and room for catch-up after federation outage. ([\#8256](https://github.com/matrix-org/synapse/issues/8256))
+- Fix non-user visible bug in implementation of `MultiWriterIdGenerator.get_current_token_for_writer`. ([\#8257](https://github.com/matrix-org/synapse/issues/8257))
+- Switch to the JSON implementation from the standard library. ([\#8259](https://github.com/matrix-org/synapse/issues/8259))
+- Add type hints to `synapse.util.async_helpers`. ([\#8260](https://github.com/matrix-org/synapse/issues/8260))
+- Simplify tests that mock asynchronous functions. ([\#8261](https://github.com/matrix-org/synapse/issues/8261))
+- Add type hints to `StreamToken` and `RoomStreamToken` classes. ([\#8279](https://github.com/matrix-org/synapse/issues/8279))
+- Change `StreamToken.room_key` to be a `RoomStreamToken` instance. ([\#8281](https://github.com/matrix-org/synapse/issues/8281))
+- Refactor notifier code to correctly use the max event stream position. ([\#8288](https://github.com/matrix-org/synapse/issues/8288))
+- Use slotted classes where possible. ([\#8296](https://github.com/matrix-org/synapse/issues/8296))
+- Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/). ([\#8317](https://github.com/matrix-org/synapse/issues/8317))
+- Update outdated usages of `metaclass` to python 3 syntax. ([\#8326](https://github.com/matrix-org/synapse/issues/8326))
+- Move lint-related dependencies to package-extra field, update CONTRIBUTING.md to utilise this. ([\#8330](https://github.com/matrix-org/synapse/issues/8330), [\#8377](https://github.com/matrix-org/synapse/issues/8377))
+- Use the `admin_patterns` helper in additional locations. ([\#8331](https://github.com/matrix-org/synapse/issues/8331))
+- Fix test logging to allow braces in log output. ([\#8335](https://github.com/matrix-org/synapse/issues/8335))
+- Remove `__future__` imports related to Python 2 compatibility. ([\#8337](https://github.com/matrix-org/synapse/issues/8337))
+- Simplify `super()` calls to Python 3 syntax. ([\#8344](https://github.com/matrix-org/synapse/issues/8344))
+- Fix bad merge from `release-v1.20.0` branch to `develop`. ([\#8354](https://github.com/matrix-org/synapse/issues/8354))
+- Factor out a `_send_dummy_event_for_room` method. ([\#8370](https://github.com/matrix-org/synapse/issues/8370))
+- Improve logging of state resolution. ([\#8371](https://github.com/matrix-org/synapse/issues/8371))
+- Add type annotations to `SimpleHttpClient`. ([\#8372](https://github.com/matrix-org/synapse/issues/8372))
+- Refactor ID generators to use `async with` syntax. ([\#8383](https://github.com/matrix-org/synapse/issues/8383))
+- Add `EventStreamPosition` type. ([\#8388](https://github.com/matrix-org/synapse/issues/8388))
+- Create a mechanism for marking tests "logcontext clean". ([\#8399](https://github.com/matrix-org/synapse/issues/8399))
+- A pair of tiny cleanups in the federation request code. ([\#8401](https://github.com/matrix-org/synapse/issues/8401))
+- Add checks on startup that PostgreSQL sequences are consistent with their associated tables. ([\#8402](https://github.com/matrix-org/synapse/issues/8402))
+- Do not include appservice users when calculating the total MAU for a server. ([\#8404](https://github.com/matrix-org/synapse/issues/8404))
+- Typing fixes for `synapse.handlers.federation`. ([\#8422](https://github.com/matrix-org/synapse/issues/8422))
+- Various refactors to simplify stream token handling. ([\#8423](https://github.com/matrix-org/synapse/issues/8423))
+- Make stream token serializing/deserializing async. ([\#8427](https://github.com/matrix-org/synapse/issues/8427))
+
+
Synapse 1.20.1 (2020-09-24)
===========================
diff --git a/changelog.d/7124.bugfix b/changelog.d/7124.bugfix
deleted file mode 100644
index 8fd177780d..0000000000
--- a/changelog.d/7124.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug in the media repository where remote thumbnails with the same size but different crop methods would overwrite each other. Contributed by @deepbluev7.
diff --git a/changelog.d/7796.bugfix b/changelog.d/7796.bugfix
deleted file mode 100644
index 65e5eb42a2..0000000000
--- a/changelog.d/7796.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix inconsistent handling of non-existent push rules, and stop tracking the `enabled` state of removed push rules.
diff --git a/changelog.d/7905.bugfix b/changelog.d/7905.bugfix
deleted file mode 100644
index e60e624412..0000000000
--- a/changelog.d/7905.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a longstanding bug when storing a media file with an empty `upload_name`.
diff --git a/changelog.d/8004.feature b/changelog.d/8004.feature
deleted file mode 100644
index a91b75e0e0..0000000000
--- a/changelog.d/8004.feature
+++ /dev/null
@@ -1 +0,0 @@
-Require the user to confirm that their password should be reset after clicking the email confirmation link.
\ No newline at end of file
diff --git a/changelog.d/8208.misc b/changelog.d/8208.misc
deleted file mode 100644
index e65da88c46..0000000000
--- a/changelog.d/8208.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix tests on distros which disable TLSv1.0. Contributed by @danc86.
diff --git a/changelog.d/8216.misc b/changelog.d/8216.misc
deleted file mode 100644
index b38911b0e5..0000000000
--- a/changelog.d/8216.misc
+++ /dev/null
@@ -1 +0,0 @@
-Simplify the distributor code to avoid unnecessary work.
diff --git a/changelog.d/8217.feature b/changelog.d/8217.feature
deleted file mode 100644
index 899cbf14ef..0000000000
--- a/changelog.d/8217.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add an admin API `GET /_synapse/admin/v1/event_reports` to read entries of table `event_reports`. Contributed by @dklimpel.
\ No newline at end of file
diff --git a/changelog.d/8227.doc b/changelog.d/8227.doc
deleted file mode 100644
index 4a43015a83..0000000000
--- a/changelog.d/8227.doc
+++ /dev/null
@@ -1 +0,0 @@
-Add `/_synapse/client` to the reverse proxy documentation.
diff --git a/changelog.d/8230.bugfix b/changelog.d/8230.bugfix
deleted file mode 100644
index 532d0e22fe..0000000000
--- a/changelog.d/8230.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8236.bugfix b/changelog.d/8236.bugfix
deleted file mode 100644
index 6f04871015..0000000000
--- a/changelog.d/8236.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a longstanding bug where files that could not be thumbnailed would result in an Internal Server Error.
diff --git a/changelog.d/8243.misc b/changelog.d/8243.misc
deleted file mode 100644
index f7375d32d3..0000000000
--- a/changelog.d/8243.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove the 'populate_stats_process_rooms_2' background job and restore functionality to 'populate_stats_process_rooms'.
\ No newline at end of file
diff --git a/changelog.d/8247.bugfix b/changelog.d/8247.bugfix
deleted file mode 100644
index 532d0e22fe..0000000000
--- a/changelog.d/8247.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8248.feature b/changelog.d/8248.feature
deleted file mode 100644
index f3c4a74bc7..0000000000
--- a/changelog.d/8248.feature
+++ /dev/null
@@ -1 +0,0 @@
-Consolidate the SSO error template across all configuration.
diff --git a/changelog.d/8250.misc b/changelog.d/8250.misc
deleted file mode 100644
index b6896a9300..0000000000
--- a/changelog.d/8250.misc
+++ /dev/null
@@ -1 +0,0 @@
-Clean up type hints for `PaginationConfig`.
diff --git a/changelog.d/8256.misc b/changelog.d/8256.misc
deleted file mode 100644
index bf0ba76730..0000000000
--- a/changelog.d/8256.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track the latest event for every destination and room for catch-up after federation outage.
diff --git a/changelog.d/8257.misc b/changelog.d/8257.misc
deleted file mode 100644
index 47ac583eb4..0000000000
--- a/changelog.d/8257.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix non-user visible bug in implementation of `MultiWriterIdGenerator.get_current_token_for_writer`.
diff --git a/changelog.d/8258.bugfix b/changelog.d/8258.bugfix
deleted file mode 100644
index 532d0e22fe..0000000000
--- a/changelog.d/8258.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8259.misc b/changelog.d/8259.misc
deleted file mode 100644
index a26779a664..0000000000
--- a/changelog.d/8259.misc
+++ /dev/null
@@ -1 +0,0 @@
-Switch to the JSON implementation from the standard library.
diff --git a/changelog.d/8260.misc b/changelog.d/8260.misc
deleted file mode 100644
index 164eea8b59..0000000000
--- a/changelog.d/8260.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add type hints to `synapse.util.async_helpers`.
diff --git a/changelog.d/8261.misc b/changelog.d/8261.misc
deleted file mode 100644
index bc91e9375c..0000000000
--- a/changelog.d/8261.misc
+++ /dev/null
@@ -1 +0,0 @@
-Simplify tests that mock asynchronous functions.
diff --git a/changelog.d/8262.bugfix b/changelog.d/8262.bugfix
deleted file mode 100644
index 2b84927de3..0000000000
--- a/changelog.d/8262.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Upgrade canonicaljson to version 1.4.0Â to fix an unicode encoding issue.
diff --git a/changelog.d/8265.bugfix b/changelog.d/8265.bugfix
deleted file mode 100644
index 981a836d21..0000000000
--- a/changelog.d/8265.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix logstanding bug which could lead to incomplete database upgrades on SQLite.
diff --git a/changelog.d/8268.bugfix b/changelog.d/8268.bugfix
deleted file mode 100644
index 4b15a60253..0000000000
--- a/changelog.d/8268.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix stack overflow when stderr is redirected to the logging system, and the logging system encounters an error.
diff --git a/changelog.d/8272.bugfix b/changelog.d/8272.bugfix
deleted file mode 100644
index 532d0e22fe..0000000000
--- a/changelog.d/8272.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8275.feature b/changelog.d/8275.feature
deleted file mode 100644
index 17549c3df3..0000000000
--- a/changelog.d/8275.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number.
\ No newline at end of file
diff --git a/changelog.d/8278.bugfix b/changelog.d/8278.bugfix
deleted file mode 100644
index 50e40ca2a9..0000000000
--- a/changelog.d/8278.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug which cause the logging system to report errors, if `DEBUG` was enabled and no `context` filter was applied.
diff --git a/changelog.d/8279.misc b/changelog.d/8279.misc
deleted file mode 100644
index 99f669001f..0000000000
--- a/changelog.d/8279.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add type hints to `StreamToken` and `RoomStreamToken` classes.
diff --git a/changelog.d/8281.misc b/changelog.d/8281.misc
deleted file mode 100644
index 74357120a7..0000000000
--- a/changelog.d/8281.misc
+++ /dev/null
@@ -1 +0,0 @@
-Change `StreamToken.room_key` to be a `RoomStreamToken` instance.
diff --git a/changelog.d/8282.misc b/changelog.d/8282.misc
deleted file mode 100644
index b6896a9300..0000000000
--- a/changelog.d/8282.misc
+++ /dev/null
@@ -1 +0,0 @@
-Clean up type hints for `PaginationConfig`.
diff --git a/changelog.d/8287.bugfix b/changelog.d/8287.bugfix
deleted file mode 100644
index 839781aa07..0000000000
--- a/changelog.d/8287.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix edge case where push could get delayed for a user until a later event was pushed.
diff --git a/changelog.d/8288.misc b/changelog.d/8288.misc
deleted file mode 100644
index c08a53a5ee..0000000000
--- a/changelog.d/8288.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor notifier code to correctly use the max event stream position.
diff --git a/changelog.d/8294.feature b/changelog.d/8294.feature
deleted file mode 100644
index b363e929ea..0000000000
--- a/changelog.d/8294.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for sharding event persister.
diff --git a/changelog.d/8296.misc b/changelog.d/8296.misc
deleted file mode 100644
index f593a5b347..0000000000
--- a/changelog.d/8296.misc
+++ /dev/null
@@ -1 +0,0 @@
-Use slotted classes where possible.
diff --git a/changelog.d/8305.feature b/changelog.d/8305.feature
deleted file mode 100644
index 862dfdf959..0000000000
--- a/changelog.d/8305.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add the room topic and avatar to the room details admin API.
diff --git a/changelog.d/8306.feature b/changelog.d/8306.feature
deleted file mode 100644
index 5c23da4030..0000000000
--- a/changelog.d/8306.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add an admin API for querying rooms where a user is a member. Contributed by @dklimpel.
\ No newline at end of file
diff --git a/changelog.d/8317.feature b/changelog.d/8317.feature
deleted file mode 100644
index f9edda099c..0000000000
--- a/changelog.d/8317.feature
+++ /dev/null
@@ -1 +0,0 @@
-Support testing the local Synapse checkout against the [Complement homeserver test suite](https://github.com/matrix-org/complement/).
\ No newline at end of file
diff --git a/changelog.d/8320.feature b/changelog.d/8320.feature
deleted file mode 100644
index 475a5fe62d..0000000000
--- a/changelog.d/8320.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add `uk.half-shot.msc2778.login.application_service` login type to allow appservices to login.
diff --git a/changelog.d/8322.bugfix b/changelog.d/8322.bugfix
deleted file mode 100644
index 532d0e22fe..0000000000
--- a/changelog.d/8322.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8324.bugfix b/changelog.d/8324.bugfix
deleted file mode 100644
index 32788a9284..0000000000
--- a/changelog.d/8324.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix fetching events from remote servers that are malformed.
diff --git a/changelog.d/8326.misc b/changelog.d/8326.misc
deleted file mode 100644
index 985d2c027a..0000000000
--- a/changelog.d/8326.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update outdated usages of `metaclass` to python 3 syntax.
\ No newline at end of file
diff --git a/changelog.d/8329.bugfix b/changelog.d/8329.bugfix
deleted file mode 100644
index 2f71f1f4b9..0000000000
--- a/changelog.d/8329.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix UnboundLocalError from occuring when appservices send malformed register request.
\ No newline at end of file
diff --git a/changelog.d/8330.misc b/changelog.d/8330.misc
deleted file mode 100644
index fbfdd52473..0000000000
--- a/changelog.d/8330.misc
+++ /dev/null
@@ -1 +0,0 @@
-Move lint-related dependencies to package-extra field, update CONTRIBUTING.md to utilise this.
diff --git a/changelog.d/8331.misc b/changelog.d/8331.misc
deleted file mode 100644
index 0e1bae20ef..0000000000
--- a/changelog.d/8331.misc
+++ /dev/null
@@ -1 +0,0 @@
-Use the `admin_patterns` helper in additional locations.
diff --git a/changelog.d/8335.misc b/changelog.d/8335.misc
deleted file mode 100644
index 7e0a4c7d83..0000000000
--- a/changelog.d/8335.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix test logging to allow braces in log output.
\ No newline at end of file
diff --git a/changelog.d/8337.misc b/changelog.d/8337.misc
deleted file mode 100644
index 4daf272204..0000000000
--- a/changelog.d/8337.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove `__future__` imports related to Python 2 compatibility.
\ No newline at end of file
diff --git a/changelog.d/8344.misc b/changelog.d/8344.misc
deleted file mode 100644
index 0b342d5137..0000000000
--- a/changelog.d/8344.misc
+++ /dev/null
@@ -1 +0,0 @@
-Simplify `super()` calls to Python 3 syntax.
diff --git a/changelog.d/8345.feature b/changelog.d/8345.feature
deleted file mode 100644
index 4ee5b6a56e..0000000000
--- a/changelog.d/8345.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add a configuration option that allows existing users to log in with OpenID Connect. Contributed by @BBBSnowball and @OmmyZhang.
diff --git a/changelog.d/8353.bugfix b/changelog.d/8353.bugfix
deleted file mode 100644
index 45fc0adb8d..0000000000
--- a/changelog.d/8353.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Don't send push notifications to expired user accounts.
diff --git a/changelog.d/8354.misc b/changelog.d/8354.misc
deleted file mode 100644
index 1d33cde2da..0000000000
--- a/changelog.d/8354.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix bad merge from `release-v1.20.0` branch to `develop`.
diff --git a/changelog.d/8362.bugfix b/changelog.d/8362.bugfix
deleted file mode 100644
index 4e50067c87..0000000000
--- a/changelog.d/8362.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fixed a regression in v1.19.0 with reactivating users through the admin API.
diff --git a/changelog.d/8364.bugfix b/changelog.d/8364.bugfix
deleted file mode 100644
index 7b82cbc388..0000000000
--- a/changelog.d/8364.bugfix
+++ /dev/null
@@ -1,2 +0,0 @@
-Fix a bug where during device registration the length of the device name wasn't
-limited.
diff --git a/changelog.d/8370.misc b/changelog.d/8370.misc
deleted file mode 100644
index 1aaac1e0bf..0000000000
--- a/changelog.d/8370.misc
+++ /dev/null
@@ -1 +0,0 @@
-Factor out a `_send_dummy_event_for_room` method.
diff --git a/changelog.d/8371.misc b/changelog.d/8371.misc
deleted file mode 100644
index 6a54a9496a..0000000000
--- a/changelog.d/8371.misc
+++ /dev/null
@@ -1 +0,0 @@
-Improve logging of state resolution.
diff --git a/changelog.d/8372.misc b/changelog.d/8372.misc
deleted file mode 100644
index a56e36de4b..0000000000
--- a/changelog.d/8372.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add type annotations to `SimpleHttpClient`.
diff --git a/changelog.d/8373.bugfix b/changelog.d/8373.bugfix
deleted file mode 100644
index e9d66a2088..0000000000
--- a/changelog.d/8373.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Include `guest_access` in the fields that are checked for null bytes when updating `room_stats_state`. Broke in v1.7.2.
\ No newline at end of file
diff --git a/changelog.d/8374.bugfix b/changelog.d/8374.bugfix
deleted file mode 100644
index 155bc3404f..0000000000
--- a/changelog.d/8374.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix theoretical race condition where events are not sent down `/sync` if the synchrotron worker is restarted without restarting other workers.
diff --git a/changelog.d/8375.doc b/changelog.d/8375.doc
deleted file mode 100644
index d291fb92fa..0000000000
--- a/changelog.d/8375.doc
+++ /dev/null
@@ -1 +0,0 @@
-Add note to the reverse proxy settings documentation about disabling Apache's mod_security2. Contributed by Julian Fietkau (@jfietkau).
diff --git a/changelog.d/8377.misc b/changelog.d/8377.misc
deleted file mode 100644
index fbfdd52473..0000000000
--- a/changelog.d/8377.misc
+++ /dev/null
@@ -1 +0,0 @@
-Move lint-related dependencies to package-extra field, update CONTRIBUTING.md to utilise this.
diff --git a/changelog.d/8383.misc b/changelog.d/8383.misc
deleted file mode 100644
index cb8318bf57..0000000000
--- a/changelog.d/8383.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor ID generators to use `async with` syntax.
diff --git a/changelog.d/8385.bugfix b/changelog.d/8385.bugfix
deleted file mode 100644
index c42502a8e0..0000000000
--- a/changelog.d/8385.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug which could cause errors in rooms with malformed membership events, on servers using sqlite.
diff --git a/changelog.d/8386.bugfix b/changelog.d/8386.bugfix
deleted file mode 100644
index 24983a1e95..0000000000
--- a/changelog.d/8386.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a bug introduced in v1.20.0 which caused the `synapse_port_db` script to fail.
diff --git a/changelog.d/8387.feature b/changelog.d/8387.feature
deleted file mode 100644
index b363e929ea..0000000000
--- a/changelog.d/8387.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for sharding event persister.
diff --git a/changelog.d/8388.misc b/changelog.d/8388.misc
deleted file mode 100644
index aaaef88b66..0000000000
--- a/changelog.d/8388.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add `EventStreamPosition` type.
diff --git a/changelog.d/8396.feature b/changelog.d/8396.feature
deleted file mode 100644
index b363e929ea..0000000000
--- a/changelog.d/8396.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for sharding event persister.
diff --git a/changelog.d/8398.bugfix b/changelog.d/8398.bugfix
deleted file mode 100644
index e432aeebf1..0000000000
--- a/changelog.d/8398.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix "Re-starting finished log context" warning when receiving an event we already had over federation.
diff --git a/changelog.d/8399.misc b/changelog.d/8399.misc
deleted file mode 100644
index ce6e8123cf..0000000000
--- a/changelog.d/8399.misc
+++ /dev/null
@@ -1 +0,0 @@
-Create a mechanism for marking tests "logcontext clean".
diff --git a/changelog.d/8400.bugfix b/changelog.d/8400.bugfix
deleted file mode 100644
index 835658ba5e..0000000000
--- a/changelog.d/8400.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix incorrect handling of timeouts on outgoing HTTP requests.
diff --git a/changelog.d/8401.misc b/changelog.d/8401.misc
deleted file mode 100644
index 27fd7ab129..0000000000
--- a/changelog.d/8401.misc
+++ /dev/null
@@ -1 +0,0 @@
-A pair of tiny cleanups in the federation request code.
diff --git a/changelog.d/8402.misc b/changelog.d/8402.misc
deleted file mode 100644
index ad1804d207..0000000000
--- a/changelog.d/8402.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add checks on startup that PostgreSQL sequences are consistent with their associated tables.
diff --git a/changelog.d/8404.misc b/changelog.d/8404.misc
deleted file mode 100644
index 7aadded6c1..0000000000
--- a/changelog.d/8404.misc
+++ /dev/null
@@ -1 +0,0 @@
-Do not include appservice users when calculating the total MAU for a server.
diff --git a/changelog.d/8405.feature b/changelog.d/8405.feature
deleted file mode 100644
index f3c4a74bc7..0000000000
--- a/changelog.d/8405.feature
+++ /dev/null
@@ -1 +0,0 @@
-Consolidate the SSO error template across all configuration.
diff --git a/changelog.d/8406.feature b/changelog.d/8406.feature
deleted file mode 100644
index 1c6472ae7e..0000000000
--- a/changelog.d/8406.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add prometheus metrics for replication requests.
diff --git a/changelog.d/8410.bugfix b/changelog.d/8410.bugfix
deleted file mode 100644
index 1323ddc525..0000000000
--- a/changelog.d/8410.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix a v1.20.0 regression in the `synapse_port_db` script regarding the `ui_auth_sessions_ips` table.
diff --git a/changelog.d/8414.bugfix b/changelog.d/8414.bugfix
deleted file mode 100644
index 315876e892..0000000000
--- a/changelog.d/8414.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Remove unnecessary 3PID registration check when resetting password via an email address. Bug introduced in v0.34.0rc2.
\ No newline at end of file
diff --git a/changelog.d/8415.doc b/changelog.d/8415.doc
deleted file mode 100644
index 28b5798533..0000000000
--- a/changelog.d/8415.doc
+++ /dev/null
@@ -1 +0,0 @@
-Improve description of `server_name` config option in `homserver.yaml`.
\ No newline at end of file
diff --git a/changelog.d/8417.feature b/changelog.d/8417.feature
deleted file mode 100644
index 17549c3df3..0000000000
--- a/changelog.d/8417.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add a config option to specify a whitelist of domains that a user can be redirected to after validating their email or phone number.
\ No newline at end of file
diff --git a/changelog.d/8419.feature b/changelog.d/8419.feature
deleted file mode 100644
index b363e929ea..0000000000
--- a/changelog.d/8419.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add experimental support for sharding event persister.
diff --git a/changelog.d/8422.misc b/changelog.d/8422.misc
deleted file mode 100644
index 03fba120c6..0000000000
--- a/changelog.d/8422.misc
+++ /dev/null
@@ -1 +0,0 @@
-Typing fixes for `synapse.handlers.federation`.
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index 4831484ef2..e4b6fad449 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -697,6 +697,7 @@ acme:
#tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
+## Federation ##
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
@@ -730,6 +731,17 @@ federation_ip_range_blacklist:
- 'fe80::/64'
- 'fc00::/7'
+# Report prometheus metrics on the age of PDUs being sent to and received from
+# the following domains. This can be used to give an idea of "delay" on inbound
+# and outbound federation, though be aware that any delay can be due to problems
+# at either end or with the intermediate network.
+#
+# By default, no domains are monitored in this way.
+#
+#federation_metrics_domains:
+# - matrix.org
+# - example.com
+
## Caching ##
@@ -1923,6 +1935,14 @@ oidc_config:
#
#display_name_template: "{{ user.given_name }} {{ user.last_name }}"
+ # Jinja2 templates for extra attributes to send back to the client during
+ # login.
+ #
+ # Note that these are non-standard and clients will ignore them without modifications.
+ #
+ #extra_attributes:
+ #birthdate: "{{ user.birthdate }}"
+
# Enable CAS for registration and login.
diff --git a/docs/sso_mapping_providers.md b/docs/sso_mapping_providers.md
index abea432343..32b06aa2c5 100644
--- a/docs/sso_mapping_providers.md
+++ b/docs/sso_mapping_providers.md
@@ -57,7 +57,7 @@ A custom mapping provider must specify the following methods:
- This method must return a string, which is the unique identifier for the
user. Commonly the ``sub`` claim of the response.
* `map_user_attributes(self, userinfo, token)`
- - This method should be async.
+ - This method must be async.
- Arguments:
- `userinfo` - A `authlib.oidc.core.claims.UserInfo` object to extract user
information from.
@@ -66,6 +66,18 @@ A custom mapping provider must specify the following methods:
- Returns a dictionary with two keys:
- localpart: A required string, used to generate the Matrix ID.
- displayname: An optional string, the display name for the user.
+* `get_extra_attributes(self, userinfo, token)`
+ - This method must be async.
+ - Arguments:
+ - `userinfo` - A `authlib.oidc.core.claims.UserInfo` object to extract user
+ information from.
+ - `token` - A dictionary which includes information necessary to make
+ further requests to the OpenID provider.
+ - Returns a dictionary that is suitable to be serialized to JSON. This
+ will be returned as part of the response during a successful login.
+
+ Note that care should be taken to not overwrite any of the parameters
+ usually returned as part of the [login response](https://matrix.org/docs/spec/client_server/latest#post-matrix-client-r0-login).
### Default OpenID Mapping Provider
diff --git a/docs/workers.md b/docs/workers.md
index df0ac84d94..ad4d8ca9f2 100644
--- a/docs/workers.md
+++ b/docs/workers.md
@@ -243,6 +243,22 @@ for the room are in flight:
^/_matrix/client/(api/v1|r0|unstable)/rooms/.*/messages$
+Additionally, the following endpoints should be included if Synapse is configured
+to use SSO (you only need to include the ones for whichever SSO provider you're
+using):
+
+ # OpenID Connect requests.
+ ^/_matrix/client/(api/v1|r0|unstable)/login/sso/redirect$
+ ^/_synapse/oidc/callback$
+
+ # SAML requests.
+ ^/_matrix/client/(api/v1|r0|unstable)/login/sso/redirect$
+ ^/_matrix/saml2/authn_response$
+
+ # CAS requests.
+ ^/_matrix/client/(api/v1|r0|unstable)/login/(cas|sso)/redirect$
+ ^/_matrix/client/(api/v1|r0|unstable)/login/cas/ticket$
+
Note that a HTTP listener with `client` and `federation` resources must be
configured in the `worker_listeners` option in the worker config.
diff --git a/synapse/__init__.py b/synapse/__init__.py
index e40b582bd5..57f818125a 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -48,7 +48,7 @@ try:
except ImportError:
pass
-__version__ = "1.20.1"
+__version__ = "1.21.0"
if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
# We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/config/_util.py b/synapse/config/_util.py
index cd31b1c3c9..c74969a977 100644
--- a/synapse/config/_util.py
+++ b/synapse/config/_util.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from typing import Any, List
+from typing import Any, Iterable
import jsonschema
@@ -20,7 +20,9 @@ from synapse.config._base import ConfigError
from synapse.types import JsonDict
-def validate_config(json_schema: JsonDict, config: Any, config_path: List[str]) -> None:
+def validate_config(
+ json_schema: JsonDict, config: Any, config_path: Iterable[str]
+) -> None:
"""Validates a config setting against a JsonSchema definition
This can be used to validate a section of the config file against a schema
diff --git a/synapse/config/federation.py b/synapse/config/federation.py
index 2c77d8f85b..ffd8fca54e 100644
--- a/synapse/config/federation.py
+++ b/synapse/config/federation.py
@@ -17,7 +17,8 @@ from typing import Optional
from netaddr import IPSet
-from ._base import Config, ConfigError
+from synapse.config._base import Config, ConfigError
+from synapse.config._util import validate_config
class FederationConfig(Config):
@@ -52,8 +53,18 @@ class FederationConfig(Config):
"Invalid range(s) provided in federation_ip_range_blacklist: %s" % e
)
+ federation_metrics_domains = config.get("federation_metrics_domains") or []
+ validate_config(
+ _METRICS_FOR_DOMAINS_SCHEMA,
+ federation_metrics_domains,
+ ("federation_metrics_domains",),
+ )
+ self.federation_metrics_domains = set(federation_metrics_domains)
+
def generate_config_section(self, config_dir_path, server_name, **kwargs):
return """\
+ ## Federation ##
+
# Restrict federation to the following whitelist of domains.
# N.B. we recommend also firewalling your federation listener to limit
# inbound federation traffic as early as possible, rather than relying
@@ -85,4 +96,18 @@ class FederationConfig(Config):
- '::1/128'
- 'fe80::/64'
- 'fc00::/7'
+
+ # Report prometheus metrics on the age of PDUs being sent to and received from
+ # the following domains. This can be used to give an idea of "delay" on inbound
+ # and outbound federation, though be aware that any delay can be due to problems
+ # at either end or with the intermediate network.
+ #
+ # By default, no domains are monitored in this way.
+ #
+ #federation_metrics_domains:
+ # - matrix.org
+ # - example.com
"""
+
+
+_METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}}
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 556e291495..be65554524 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -92,5 +92,4 @@ class HomeServerConfig(RootConfig):
TracerConfig,
WorkerConfig,
RedisConfig,
- FederationConfig,
]
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index 70fc8a2f62..f924116819 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -204,6 +204,14 @@ class OIDCConfig(Config):
# If unset, no displayname will be set.
#
#display_name_template: "{{{{ user.given_name }}}} {{{{ user.last_name }}}}"
+
+ # Jinja2 templates for extra attributes to send back to the client during
+ # login.
+ #
+ # Note that these are non-standard and clients will ignore them without modifications.
+ #
+ #extra_attributes:
+ #birthdate: "{{{{ user.birthdate }}}}"
""".format(
mapping_provider=DEFAULT_USER_MAPPING_PROVIDER
)
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index e368ea564d..9ddb8b546b 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -471,7 +471,6 @@ class TlsConfig(Config):
# or by checking matrix.org/federationtester/api/report?server_name=$host
#
#tls_fingerprints: [{"sha256": "<base64_encoded_sha256_fingerprint>"}]
-
"""
# Lowercase the string representation of boolean values
% {
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index bf800a3852..dc49df0812 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -23,7 +23,7 @@ from typing import Dict, Optional, Tuple, Type
from unpaddedbase64 import encode_base64
from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken
from synapse.util.caches import intern_dict
from synapse.util.frozenutils import freeze
@@ -118,8 +118,8 @@ class _EventInternalMetadata:
# XXX: These are set by StreamWorkerStore._set_before_and_after.
# I'm pretty sure that these are never persisted to the database, so shouldn't
# be here
- before = DictProperty("before") # type: str
- after = DictProperty("after") # type: str
+ before = DictProperty("before") # type: RoomStreamToken
+ after = DictProperty("after") # type: RoomStreamToken
order = DictProperty("order") # type: Tuple[int, int]
def get_dict(self) -> JsonDict:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 2dcd081cbc..24329dd0e3 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -28,7 +28,7 @@ from typing import (
Union,
)
-from prometheus_client import Counter, Histogram
+from prometheus_client import Counter, Gauge, Histogram
from twisted.internet import defer
from twisted.internet.abstract import isIPAddress
@@ -88,6 +88,13 @@ pdu_process_time = Histogram(
)
+last_pdu_age_metric = Gauge(
+ "synapse_federation_last_received_pdu_age",
+ "The age (in seconds) of the last PDU successfully received from the given domain",
+ labelnames=("server_name",),
+)
+
+
class FederationServer(FederationBase):
def __init__(self, hs):
super().__init__(hs)
@@ -118,6 +125,10 @@ class FederationServer(FederationBase):
hs, "state_ids_resp", timeout_ms=30000
)
+ self._federation_metrics_domains = (
+ hs.get_config().federation.federation_metrics_domains
+ )
+
async def on_backfill_request(
self, origin: str, room_id: str, versions: List[str], limit: int
) -> Tuple[int, Dict[str, Any]]:
@@ -262,7 +273,11 @@ class FederationServer(FederationBase):
pdus_by_room = {} # type: Dict[str, List[EventBase]]
+ newest_pdu_ts = 0
+
for p in transaction.pdus: # type: ignore
+ # FIXME (richardv): I don't think this works:
+ # https://github.com/matrix-org/synapse/issues/8429
if "unsigned" in p:
unsigned = p["unsigned"]
if "age" in unsigned:
@@ -300,6 +315,9 @@ class FederationServer(FederationBase):
event = event_from_pdu_json(p, room_version)
pdus_by_room.setdefault(room_id, []).append(event)
+ if event.origin_server_ts > newest_pdu_ts:
+ newest_pdu_ts = event.origin_server_ts
+
pdu_results = {}
# we can process different rooms in parallel (which is useful if they
@@ -340,6 +358,10 @@ class FederationServer(FederationBase):
process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
)
+ if newest_pdu_ts and origin in self._federation_metrics_domains:
+ newest_pdu_age = self._clock.time_msec() - newest_pdu_ts
+ last_pdu_age_metric.labels(server_name=origin).set(newest_pdu_age / 1000)
+
return pdu_results
async def _handle_edus_in_txn(self, origin: str, transaction: Transaction):
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index c84072ab73..3e07f925e0 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -15,6 +15,8 @@
import logging
from typing import TYPE_CHECKING, List
+from prometheus_client import Gauge
+
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
from synapse.federation.persistence import TransactionActions
@@ -34,6 +36,12 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+last_pdu_age_metric = Gauge(
+ "synapse_federation_last_sent_pdu_age",
+ "The age (in seconds) of the last PDU successfully sent to the given domain",
+ labelnames=("server_name",),
+)
+
class TransactionManager:
"""Helper class which handles building and sending transactions
@@ -48,6 +56,10 @@ class TransactionManager:
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
+ self._federation_metrics_domains = (
+ hs.get_config().federation.federation_metrics_domains
+ )
+
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@@ -119,6 +131,9 @@ class TransactionManager:
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
+ # FIXME (richardv): I also believe it no longer works. We (now?) store
+ # "age_ts" in "unsigned" rather than at the top level. See
+ # https://github.com/matrix-org/synapse/issues/8429.
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
@@ -167,5 +182,12 @@ class TransactionManager:
)
success = False
+ if success and pdus and destination in self._federation_metrics_domains:
+ last_pdu = pdus[-1]
+ last_pdu_age = self.clock.time_msec() - last_pdu.origin_server_ts
+ last_pdu_age_metric.labels(server_name=destination).set(
+ last_pdu_age / 1000
+ )
+
set_tag(tags.ERROR, not success)
return success
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index dd981c597e..1ce2091b46 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -153,7 +153,7 @@ class AdminHandler(BaseHandler):
if not events:
break
- from_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+ from_key = events[-1].internal_metadata.after
events = await filter_events_for_client(self.storage, user_id, events)
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 0322b60cfc..00eae92052 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -137,6 +137,15 @@ def login_id_phone_to_thirdparty(identifier: JsonDict) -> Dict[str, str]:
}
+@attr.s(slots=True)
+class SsoLoginExtraAttributes:
+ """Data we track about SAML2 sessions"""
+
+ # time the session was created, in milliseconds
+ creation_time = attr.ib(type=int)
+ extra_attributes = attr.ib(type=JsonDict)
+
+
class AuthHandler(BaseHandler):
SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
@@ -239,6 +248,10 @@ class AuthHandler(BaseHandler):
# cast to tuple for use with str.startswith
self._whitelisted_sso_clients = tuple(hs.config.sso_client_whitelist)
+ # A mapping of user ID to extra attributes to include in the login
+ # response.
+ self._extra_attributes = {} # type: Dict[str, SsoLoginExtraAttributes]
+
async def validate_user_via_ui_auth(
self,
requester: Requester,
@@ -1165,6 +1178,7 @@ class AuthHandler(BaseHandler):
registered_user_id: str,
request: SynapseRequest,
client_redirect_url: str,
+ extra_attributes: Optional[JsonDict] = None,
):
"""Having figured out a mxid for this user, complete the HTTP request
@@ -1173,6 +1187,8 @@ class AuthHandler(BaseHandler):
request: The request to complete.
client_redirect_url: The URL to which to redirect the user at the end of the
process.
+ extra_attributes: Extra attributes which will be passed to the client
+ during successful login. Must be JSON serializable.
"""
# If the account has been deactivated, do not proceed with the login
# flow.
@@ -1181,19 +1197,30 @@ class AuthHandler(BaseHandler):
respond_with_html(request, 403, self._sso_account_deactivated_template)
return
- self._complete_sso_login(registered_user_id, request, client_redirect_url)
+ self._complete_sso_login(
+ registered_user_id, request, client_redirect_url, extra_attributes
+ )
def _complete_sso_login(
self,
registered_user_id: str,
request: SynapseRequest,
client_redirect_url: str,
+ extra_attributes: Optional[JsonDict] = None,
):
"""
The synchronous portion of complete_sso_login.
This exists purely for backwards compatibility of synapse.module_api.ModuleApi.
"""
+ # Store any extra attributes which will be passed in the login response.
+ # Note that this is per-user so it may overwrite a previous value, this
+ # is considered OK since the newest SSO attributes should be most valid.
+ if extra_attributes:
+ self._extra_attributes[registered_user_id] = SsoLoginExtraAttributes(
+ self._clock.time_msec(), extra_attributes,
+ )
+
# Create a login token
login_token = self.macaroon_gen.generate_short_term_login_token(
registered_user_id
@@ -1226,6 +1253,37 @@ class AuthHandler(BaseHandler):
)
respond_with_html(request, 200, html)
+ async def _sso_login_callback(self, login_result: JsonDict) -> None:
+ """
+ A login callback which might add additional attributes to the login response.
+
+ Args:
+ login_result: The data to be sent to the client. Includes the user
+ ID and access token.
+ """
+ # Expire attributes before processing. Note that there shouldn't be any
+ # valid logins that still have extra attributes.
+ self._expire_sso_extra_attributes()
+
+ extra_attributes = self._extra_attributes.get(login_result["user_id"])
+ if extra_attributes:
+ login_result.update(extra_attributes.extra_attributes)
+
+ def _expire_sso_extra_attributes(self) -> None:
+ """
+ Iterate through the mapping of user IDs to extra attributes and remove any that are no longer valid.
+ """
+ # TODO This should match the amount of time the macaroon is valid for.
+ LOGIN_TOKEN_EXPIRATION_TIME = 2 * 60 * 1000
+ expire_before = self._clock.time_msec() - LOGIN_TOKEN_EXPIRATION_TIME
+ to_expire = set()
+ for user_id, data in self._extra_attributes.items():
+ if data.creation_time < expire_before:
+ to_expire.add(user_id)
+ for user_id in to_expire:
+ logger.debug("Expiring extra attributes for user %s", user_id)
+ del self._extra_attributes[user_id]
+
@staticmethod
def add_query_param_to_url(url: str, param_name: str, param: Any):
url_parts = list(urllib.parse.urlparse(url))
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 4149520d6c..b9d9098104 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -29,7 +29,6 @@ from synapse.api.errors import (
from synapse.logging.opentracing import log_kv, set_tag, trace
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.types import (
- RoomStreamToken,
StreamToken,
get_domain_from_id,
get_verify_key_from_cross_signing_key,
@@ -113,8 +112,7 @@ class DeviceWorkerHandler(BaseHandler):
set_tag("user_id", user_id)
set_tag("from_token", from_token)
- now_room_id = self.store.get_room_max_stream_ordering()
- now_room_key = RoomStreamToken(None, now_room_id)
+ now_room_key = self.store.get_room_max_token()
room_ids = await self.store.get_rooms_for_user(user_id)
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 0875b74ea8..539b4fc32e 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -133,8 +133,8 @@ class EventStreamHandler(BaseHandler):
chunk = {
"chunk": chunks,
- "start": tokens[0].to_string(),
- "end": tokens[1].to_string(),
+ "start": await tokens[0].to_string(self.store),
+ "end": await tokens[1].to_string(self.store),
}
return chunk
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb91f2f762..29ec18e25e 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -21,7 +21,7 @@ import itertools
import logging
from collections.abc import Container
from http import HTTPStatus
-from typing import Dict, Iterable, List, Optional, Sequence, Tuple, Union
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Sequence, Tuple, Union
import attr
from signedjson.key import decode_verify_key_bytes
@@ -69,7 +69,7 @@ from synapse.replication.http.federation import (
ReplicationFederationSendEventsRestServlet,
ReplicationStoreRoomOnInviteRestServlet,
)
-from synapse.state import StateResolutionStore, resolve_events_with_store
+from synapse.state import StateResolutionStore
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.types import (
JsonDict,
@@ -85,6 +85,9 @@ from synapse.util.retryutils import NotRetryingDestination
from synapse.util.stringutils import shortstr
from synapse.visibility import filter_events_for_server
+if TYPE_CHECKING:
+ from synapse.server import HomeServer
+
logger = logging.getLogger(__name__)
@@ -116,7 +119,7 @@ class FederationHandler(BaseHandler):
rooms.
"""
- def __init__(self, hs):
+ def __init__(self, hs: "HomeServer"):
super().__init__(hs)
self.hs = hs
@@ -126,6 +129,7 @@ class FederationHandler(BaseHandler):
self.state_store = self.storage.state
self.federation_client = hs.get_federation_client()
self.state_handler = hs.get_state_handler()
+ self._state_resolution_handler = hs.get_state_resolution_handler()
self.server_name = hs.hostname
self.keyring = hs.get_keyring()
self.action_generator = hs.get_action_generator()
@@ -386,8 +390,7 @@ class FederationHandler(BaseHandler):
event_map[x.event_id] = x
room_version = await self.store.get_room_version_id(room_id)
- state_map = await resolve_events_with_store(
- self.clock,
+ state_map = await self._state_resolution_handler.resolve_events_with_store(
room_id,
room_version,
state_maps,
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 8cd7eb22a3..39a85801c1 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -203,8 +203,8 @@ class InitialSyncHandler(BaseHandler):
messages, time_now=time_now, as_client_event=as_client_event
)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
}
d["state"] = await self._event_serializer.serialize_events(
@@ -249,7 +249,7 @@ class InitialSyncHandler(BaseHandler):
],
"account_data": account_data_events,
"receipts": receipt,
- "end": now_token.to_string(),
+ "end": await now_token.to_string(self.store),
}
return ret
@@ -325,7 +325,8 @@ class InitialSyncHandler(BaseHandler):
if limit is None:
limit = 10
- stream_token = await self.store.get_stream_token_for_event(member_event_id)
+ leave_position = await self.store.get_position_for_event(member_event_id)
+ stream_token = leave_position.to_room_stream_token()
messages, token = await self.store.get_recent_events_for_room(
room_id, limit=limit, end_token=stream_token
@@ -347,8 +348,8 @@ class InitialSyncHandler(BaseHandler):
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
},
"state": (
await self._event_serializer.serialize_events(
@@ -446,8 +447,8 @@ class InitialSyncHandler(BaseHandler):
"chunk": (
await self._event_serializer.serialize_events(messages, time_now)
),
- "start": start_token.to_string(),
- "end": end_token.to_string(),
+ "start": await start_token.to_string(self.store),
+ "end": await end_token.to_string(self.store),
},
"state": state,
"presence": presence,
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 0e06e4408d..19cd652675 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -37,7 +37,7 @@ from synapse.config import ConfigError
from synapse.http.server import respond_with_html
from synapse.http.site import SynapseRequest
from synapse.logging.context import make_deferred_yieldable
-from synapse.types import UserID, map_username_to_mxid_localpart
+from synapse.types import JsonDict, UserID, map_username_to_mxid_localpart
from synapse.util import json_decoder
if TYPE_CHECKING:
@@ -707,6 +707,15 @@ class OidcHandler:
self._render_error(request, "mapping_error", str(e))
return
+ # Mapping providers might not have get_extra_attributes: only call this
+ # method if it exists.
+ extra_attributes = None
+ get_extra_attributes = getattr(
+ self._user_mapping_provider, "get_extra_attributes", None
+ )
+ if get_extra_attributes:
+ extra_attributes = await get_extra_attributes(userinfo, token)
+
# and finally complete the login
if ui_auth_session_id:
await self._auth_handler.complete_sso_ui_auth(
@@ -714,7 +723,7 @@ class OidcHandler:
)
else:
await self._auth_handler.complete_sso_login(
- user_id, request, client_redirect_url
+ user_id, request, client_redirect_url, extra_attributes
)
def _generate_oidc_session_token(
@@ -984,7 +993,7 @@ class OidcMappingProvider(Generic[C]):
async def map_user_attributes(
self, userinfo: UserInfo, token: Token
) -> UserAttribute:
- """Map a ``UserInfo`` objects into user attributes.
+ """Map a `UserInfo` object into user attributes.
Args:
userinfo: An object representing the user given by the OIDC provider
@@ -995,6 +1004,18 @@ class OidcMappingProvider(Generic[C]):
"""
raise NotImplementedError()
+ async def get_extra_attributes(self, userinfo: UserInfo, token: Token) -> JsonDict:
+ """Map a `UserInfo` object into additional attributes passed to the client during login.
+
+ Args:
+ userinfo: An object representing the user given by the OIDC provider
+ token: A dict with the tokens returned by the provider
+
+ Returns:
+ A dict containing additional attributes. Must be JSON serializable.
+ """
+ return {}
+
# Used to clear out "None" values in templates
def jinja_finalize(thing):
@@ -1009,6 +1030,7 @@ class JinjaOidcMappingConfig:
subject_claim = attr.ib() # type: str
localpart_template = attr.ib() # type: Template
display_name_template = attr.ib() # type: Optional[Template]
+ extra_attributes = attr.ib() # type: Dict[str, Template]
class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
@@ -1047,10 +1069,28 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
% (e,)
)
+ extra_attributes = {} # type Dict[str, Template]
+ if "extra_attributes" in config:
+ extra_attributes_config = config.get("extra_attributes") or {}
+ if not isinstance(extra_attributes_config, dict):
+ raise ConfigError(
+ "oidc_config.user_mapping_provider.config.extra_attributes must be a dict"
+ )
+
+ for key, value in extra_attributes_config.items():
+ try:
+ extra_attributes[key] = env.from_string(value)
+ except Exception as e:
+ raise ConfigError(
+ "invalid jinja template for oidc_config.user_mapping_provider.config.extra_attributes.%s: %r"
+ % (key, e)
+ )
+
return JinjaOidcMappingConfig(
subject_claim=subject_claim,
localpart_template=localpart_template,
display_name_template=display_name_template,
+ extra_attributes=extra_attributes,
)
def get_remote_user_id(self, userinfo: UserInfo) -> str:
@@ -1071,3 +1111,13 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
display_name = None
return UserAttribute(localpart=localpart, display_name=display_name)
+
+ async def get_extra_attributes(self, userinfo: UserInfo, token: Token) -> JsonDict:
+ extras = {} # type: Dict[str, str]
+ for key, template in self._config.extra_attributes.items():
+ try:
+ extras[key] = template.render(user=userinfo).strip()
+ except Exception as e:
+ # Log an error and skip this value (don't break login for this).
+ logger.error("Failed to render OIDC extra attribute %s: %s" % (key, e))
+ return extras
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index a0b3bdb5e0..2c2a633938 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -25,7 +25,7 @@ from synapse.logging.context import run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.state import StateFilter
from synapse.streams.config import PaginationConfig
-from synapse.types import Requester, RoomStreamToken
+from synapse.types import Requester
from synapse.util.async_helpers import ReadWriteLock
from synapse.util.stringutils import random_string
from synapse.visibility import filter_events_for_client
@@ -373,10 +373,9 @@ class PaginationHandler:
# case "JOIN" would have been returned.
assert member_event_id
- leave_token_str = await self.store.get_topological_token_for_event(
+ leave_token = await self.store.get_topological_token_for_event(
member_event_id
)
- leave_token = RoomStreamToken.parse(leave_token_str)
assert leave_token.topological is not None
if leave_token.topological < curr_topo:
@@ -414,8 +413,8 @@ class PaginationHandler:
if not events:
return {
"chunk": [],
- "start": from_token.to_string(),
- "end": next_token.to_string(),
+ "start": await from_token.to_string(self.store),
+ "end": await next_token.to_string(self.store),
}
state = None
@@ -443,8 +442,8 @@ class PaginationHandler:
events, time_now, as_client_event=as_client_event
)
),
- "start": from_token.to_string(),
- "end": next_token.to_string(),
+ "start": await from_token.to_string(self.store),
+ "end": await next_token.to_string(self.store),
}
if state:
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 5714ba519d..1333999782 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1107,11 +1107,13 @@ class RoomContextHandler:
# the token, which we replace.
token = StreamToken.START
- results["start"] = token.copy_and_replace(
+ results["start"] = await token.copy_and_replace(
"room_key", results["start"]
- ).to_string()
+ ).to_string(self.store)
- results["end"] = token.copy_and_replace("room_key", results["end"]).to_string()
+ results["end"] = await token.copy_and_replace(
+ "room_key", results["end"]
+ ).to_string(self.store)
return results
@@ -1164,14 +1166,14 @@ class RoomEventSource:
events[:] = events[:limit]
if events:
- end_key = RoomStreamToken.parse(events[-1].internal_metadata.after)
+ end_key = events[-1].internal_metadata.after
else:
end_key = to_key
return (events, end_key)
def get_current_key(self) -> RoomStreamToken:
- return RoomStreamToken(None, self.store.get_room_max_stream_ordering())
+ return self.store.get_room_max_token()
def get_current_key_for_room(self, room_id: str) -> Awaitable[str]:
return self.store.get_room_events_max_id(room_id)
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index 6a76c20d79..e9402e6e2e 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -362,13 +362,13 @@ class SearchHandler(BaseHandler):
self.storage, user.to_string(), res["events_after"]
)
- res["start"] = now_token.copy_and_replace(
+ res["start"] = await now_token.copy_and_replace(
"room_key", res["start"]
- ).to_string()
+ ).to_string(self.store)
- res["end"] = now_token.copy_and_replace(
+ res["end"] = await now_token.copy_and_replace(
"room_key", res["end"]
- ).to_string()
+ ).to_string(self.store)
if include_profile:
senders = {
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index e948efef2e..bfe2583002 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -519,7 +519,7 @@ class SyncHandler:
if len(recents) > timeline_limit:
limited = True
recents = recents[-timeline_limit:]
- room_key = RoomStreamToken.parse(recents[0].internal_metadata.before)
+ room_key = recents[0].internal_metadata.before
prev_batch_token = now_token.copy_and_replace("room_key", room_key)
@@ -1595,16 +1595,24 @@ class SyncHandler:
if leave_events:
leave_event = leave_events[-1]
- leave_stream_token = await self.store.get_stream_token_for_event(
+ leave_position = await self.store.get_position_for_event(
leave_event.event_id
)
- leave_token = since_token.copy_and_replace(
- "room_key", leave_stream_token
- )
- if since_token and since_token.is_after(leave_token):
+ # If the leave event happened before the since token then we
+ # bail.
+ if since_token and not leave_position.persisted_after(
+ since_token.room_key
+ ):
continue
+ # We can safely convert the position of the leave event into a
+ # stream token as it'll only be used in the context of this
+ # room. (c.f. the docstring of `to_room_stream_token`).
+ leave_token = since_token.copy_and_replace(
+ "room_key", leave_position.to_room_stream_token()
+ )
+
# If this is an out of band message, like a remote invite
# rejection, we include it in the recents batch. Otherwise, we
# let _load_filtered_recents handle fetching the correct
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a1f7ca3449..b8d2a8e8a9 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -15,6 +15,7 @@
import functools
import gc
+import itertools
import logging
import os
import platform
@@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
REGISTRY,
CounterMetricFamily,
+ GaugeHistogramMetricFamily,
GaugeMetricFamily,
- HistogramMetricFamily,
)
from twisted.internet import reactor
@@ -46,7 +47,7 @@ logger = logging.getLogger(__name__)
METRICS_PREFIX = "/_synapse/metrics"
running_on_pypy = platform.python_implementation() == "PyPy"
-all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]]
+all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]]
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
@@ -205,63 +206,83 @@ class InFlightGauge:
all_gauges[self.name] = self
-@attr.s(slots=True, hash=True)
-class BucketCollector:
- """
- Like a Histogram, but allows buckets to be point-in-time instead of
- incrementally added to.
+class GaugeBucketCollector:
+ """Like a Histogram, but the buckets are Gauges which are updated atomically.
- Args:
- name (str): Base name of metric to be exported to Prometheus.
- data_collector (callable -> dict): A synchronous callable that
- returns a dict mapping bucket to number of items in the
- bucket. If these buckets are not the same as the buckets
- given to this class, they will be remapped into them.
- buckets (list[float]): List of floats/ints of the buckets to
- give to Prometheus. +Inf is ignored, if given.
+ The data is updated by calling `update_data` with an iterable of measurements.
+ We assume that the data is updated less frequently than it is reported to
+ Prometheus, and optimise for that case.
"""
- name = attr.ib()
- data_collector = attr.ib()
- buckets = attr.ib()
+ __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric")
- def collect(self):
+ def __init__(
+ self,
+ name: str,
+ documentation: str,
+ buckets: Iterable[float],
+ registry=REGISTRY,
+ ):
+ """
+ Args:
+ name: base name of metric to be exported to Prometheus. (a _bucket suffix
+ will be added.)
+ documentation: help text for the metric
+ buckets: The top bounds of the buckets to report
+ registry: metric registry to register with
+ """
+ self._name = name
+ self._documentation = documentation
- # Fetch the data -- this must be synchronous!
- data = self.data_collector()
+ # the tops of the buckets
+ self._bucket_bounds = [float(b) for b in buckets]
+ if self._bucket_bounds != sorted(self._bucket_bounds):
+ raise ValueError("Buckets not in sorted order")
- buckets = {} # type: Dict[float, int]
+ if self._bucket_bounds[-1] != float("inf"):
+ self._bucket_bounds.append(float("inf"))
- res = []
- for x in data.keys():
- for i, bound in enumerate(self.buckets):
- if x <= bound:
- buckets[bound] = buckets.get(bound, 0) + data[x]
+ self._metric = self._values_to_metric([])
+ registry.register(self)
- for i in self.buckets:
- res.append([str(i), buckets.get(i, 0)])
+ def collect(self):
+ yield self._metric
- res.append(["+Inf", sum(data.values())])
+ def update_data(self, values: Iterable[float]):
+ """Update the data to be reported by the metric
- metric = HistogramMetricFamily(
- self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items())
+ The existing data is cleared, and each measurement in the input is assigned
+ to the relevant bucket.
+ """
+ self._metric = self._values_to_metric(values)
+
+ def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily:
+ total = 0.0
+ bucket_values = [0 for _ in self._bucket_bounds]
+
+ for v in values:
+ # assign each value to a bucket
+ for i, bound in enumerate(self._bucket_bounds):
+ if v <= bound:
+ bucket_values[i] += 1
+ break
+
+ # ... and increment the sum
+ total += v
+
+ # now, aggregate the bucket values so that they count the number of entries in
+ # that bucket or below.
+ accumulated_values = itertools.accumulate(bucket_values)
+
+ return GaugeHistogramMetricFamily(
+ self._name,
+ self._documentation,
+ buckets=list(
+ zip((str(b) for b in self._bucket_bounds), accumulated_values)
+ ),
+ gsum_value=total,
)
- yield metric
-
- def __attrs_post_init__(self):
- self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
- if self.buckets != sorted(self.buckets):
- raise ValueError("Buckets not sorted")
-
- self.buckets = tuple(self.buckets)
-
- if self.name in all_gauges.keys():
- logger.warning("%s already registered, reregistering" % (self.name,))
- REGISTRY.unregister(all_gauges.pop(self.name))
-
- REGISTRY.register(self)
- all_gauges[self.name] = self
#
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index 4304c60d56..734271e765 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -24,9 +24,9 @@ expect, and the newer "best practice" version of the up-to-date official client.
import math
import threading
-from collections import namedtuple
from http.server import BaseHTTPRequestHandler, HTTPServer
from socketserver import ThreadingMixIn
+from typing import Dict, List
from urllib.parse import parse_qs, urlparse
from prometheus_client import REGISTRY
@@ -35,14 +35,6 @@ from twisted.web.resource import Resource
from synapse.util import caches
-try:
- from prometheus_client.samples import Sample
-except ImportError:
- Sample = namedtuple( # type: ignore[no-redef] # noqa
- "Sample", ["name", "labels", "value", "timestamp", "exemplar"]
- )
-
-
CONTENT_TYPE_LATEST = str("text/plain; version=0.0.4; charset=utf-8")
@@ -93,17 +85,6 @@ def sample_line(line, name):
)
-def nameify_sample(sample):
- """
- If we get a prometheus_client<0.4.0 sample as a tuple, transform it into a
- namedtuple which has the names we expect.
- """
- if not isinstance(sample, Sample):
- sample = Sample(*sample, None, None)
-
- return sample
-
-
def generate_latest(registry, emit_help=False):
# Trigger the cache metrics to be rescraped, which updates the common
@@ -144,16 +125,33 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mname, mtype))
- for sample in map(nameify_sample, metric.samples):
- # Get rid of the OpenMetrics specific samples
+
+ om_samples = {} # type: Dict[str, List[str]]
+ for s in metric.samples:
for suffix in ["_created", "_gsum", "_gcount"]:
- if sample.name.endswith(suffix):
+ if s.name == metric.name + suffix:
+ # OpenMetrics specific sample, put in a gauge at the end.
+ # (these come from gaugehistograms which don't get renamed,
+ # so no need to faff with mnewname)
+ om_samples.setdefault(suffix, []).append(sample_line(s, s.name))
break
else:
- newname = sample.name.replace(mnewname, mname)
+ newname = s.name.replace(mnewname, mname)
if ":" in newname and newname.endswith("_total"):
newname = newname[: -len("_total")]
- output.append(sample_line(sample, newname))
+ output.append(sample_line(s, newname))
+
+ for suffix, lines in sorted(om_samples.items()):
+ if emit_help:
+ output.append(
+ "# HELP {0}{1} {2}\n".format(
+ metric.name,
+ suffix,
+ metric.documentation.replace("\\", r"\\").replace("\n", r"\n"),
+ )
+ )
+ output.append("# TYPE {0}{1} gauge\n".format(metric.name, suffix))
+ output.extend(lines)
# Get rid of the weird colon things while we're at it
if mtype == "counter":
@@ -172,16 +170,16 @@ def generate_latest(registry, emit_help=False):
)
)
output.append("# TYPE {0} {1}\n".format(mnewname, mtype))
- for sample in map(nameify_sample, metric.samples):
- # Get rid of the OpenMetrics specific samples
+
+ for s in metric.samples:
+ # Get rid of the OpenMetrics specific samples (we should already have
+ # dealt with them above anyway.)
for suffix in ["_created", "_gsum", "_gcount"]:
- if sample.name.endswith(suffix):
+ if s.name == metric.name + suffix:
break
else:
output.append(
- sample_line(
- sample, sample.name.replace(":total", "").replace(":", "_")
- )
+ sample_line(s, s.name.replace(":total", "").replace(":", "_"))
)
return "".join(output).encode("utf-8")
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 441b3d15e2..59415f6f88 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -163,7 +163,7 @@ class _NotifierUserStream:
"""
# Immediately wake up stream if something has already since happened
# since their last token.
- if self.last_notified_token.is_after(token):
+ if self.last_notified_token != token:
return _NotificationListener(defer.succeed(self.current_token))
else:
return _NotificationListener(self.notify_deferred.observe())
@@ -470,7 +470,7 @@ class Notifier:
async def check_for_updates(
before_token: StreamToken, after_token: StreamToken
) -> EventStreamResult:
- if not after_token.is_after(before_token):
+ if after_token == before_token:
return EventStreamResult([], (from_token, from_token))
events = [] # type: List[EventBase]
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 288631477e..0ddead8a0f 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -68,7 +68,11 @@ REQUIREMENTS = [
"pymacaroons>=0.13.0",
"msgpack>=0.5.2",
"phonenumbers>=8.2.0",
- "prometheus_client>=0.0.18,<0.9.0",
+ # we use GaugeHistogramMetric, which was added in prom-client 0.4.0.
+ # prom-client has a history of breaking backwards compatibility between
+ # minor versions (https://github.com/prometheus/client_python/issues/317),
+ # so we also pin the minor version.
+ "prometheus_client>=0.4.0,<0.9.0",
# we use attr.validators.deep_iterable, which arrived in 19.1.0 (Note:
# Fedora 31 only has 19.1, so if we want to upgrade we should wait until 33
# is out in November.)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 55af3d41ea..e165429cad 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,7 @@ from synapse.replication.tcp.streams.events import (
EventsStreamEventRow,
EventsStreamRow,
)
-from synapse.types import PersistedEventPosition, RoomStreamToken, UserID
+from synapse.types import PersistedEventPosition, UserID
from synapse.util.async_helpers import timeout_deferred
from synapse.util.metrics import Measure
@@ -152,9 +152,7 @@ class ReplicationDataHandler:
if event.type == EventTypes.Member:
extra_users = (UserID.from_string(event.state_key),)
- max_token = RoomStreamToken(
- None, self.store.get_room_max_stream_ordering()
- )
+ max_token = self.store.get_room_max_token()
event_pos = PersistedEventPosition(instance_name, token)
self.notifier.on_new_room_event(
event, event_pos, max_token, extra_users
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 5c5f00b213..57cac22252 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -109,7 +109,8 @@ class PurgeHistoryRestServlet(RestServlet):
if event.room_id != room_id:
raise SynapseError(400, "Event is for wrong room.")
- token = await self.store.get_topological_token_for_event(event_id)
+ room_token = await self.store.get_topological_token_for_event(event_id)
+ token = await room_token.to_string(self.store)
logger.info("[purge] purging up to token %s (event_id %s)", token, event_id)
elif "purge_up_to_ts" in body:
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index 985d994f6b..1ecb77aa26 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -33,6 +33,7 @@ class EventStreamRestServlet(RestServlet):
super().__init__()
self.event_stream_handler = hs.get_event_stream_handler()
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
@@ -44,7 +45,7 @@ class EventStreamRestServlet(RestServlet):
if b"room_id" in request.args:
room_id = request.args[b"room_id"][0].decode("ascii")
- pagin_config = PaginationConfig.from_request(request)
+ pagin_config = await PaginationConfig.from_request(self.store, request)
timeout = EventStreamRestServlet.DEFAULT_LONGPOLL_TIME_MS
if b"timeout" in request.args:
try:
diff --git a/synapse/rest/client/v1/initial_sync.py b/synapse/rest/client/v1/initial_sync.py
index d7042786ce..91da0ee573 100644
--- a/synapse/rest/client/v1/initial_sync.py
+++ b/synapse/rest/client/v1/initial_sync.py
@@ -27,11 +27,12 @@ class InitialSyncRestServlet(RestServlet):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request)
as_client_event = b"raw" not in request.args
- pagination_config = PaginationConfig.from_request(request)
+ pagination_config = await PaginationConfig.from_request(self.store, request)
include_archived = parse_boolean(request, "archived", default=False)
content = await self.initial_sync_handler.snapshot_all_rooms(
user_id=requester.user.to_string(),
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index 250b03a025..b9347b87c7 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -284,9 +284,7 @@ class LoginRestServlet(RestServlet):
self,
user_id: str,
login_submission: JsonDict,
- callback: Optional[
- Callable[[Dict[str, str]], Awaitable[Dict[str, str]]]
- ] = None,
+ callback: Optional[Callable[[Dict[str, str]], Awaitable[None]]] = None,
create_non_existent_users: bool = False,
) -> Dict[str, str]:
"""Called when we've successfully authed the user and now need to
@@ -299,12 +297,12 @@ class LoginRestServlet(RestServlet):
Args:
user_id: ID of the user to register.
login_submission: Dictionary of login information.
- callback: Callback function to run after registration.
+ callback: Callback function to run after login.
create_non_existent_users: Whether to create the user if they don't
exist. Defaults to False.
Returns:
- result: Dictionary of account information after successful registration.
+ result: Dictionary of account information after successful login.
"""
# Before we actually log them in we check if they've already logged in
@@ -339,14 +337,24 @@ class LoginRestServlet(RestServlet):
return result
async def _do_token_login(self, login_submission: JsonDict) -> Dict[str, str]:
+ """
+ Handle the final stage of SSO login.
+
+ Args:
+ login_submission: The JSON request body.
+
+ Returns:
+ The body of the JSON response.
+ """
token = login_submission["token"]
auth_handler = self.auth_handler
user_id = await auth_handler.validate_short_term_login_token_and_get_user_id(
token
)
- result = await self._complete_login(user_id, login_submission)
- return result
+ return await self._complete_login(
+ user_id, login_submission, self.auth_handler._sso_login_callback
+ )
async def _do_jwt_login(self, login_submission: JsonDict) -> Dict[str, str]:
token = login_submission.get("token", None)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index b76811cc3b..b421fe855e 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -451,6 +451,7 @@ class RoomMemberListRestServlet(RestServlet):
super().__init__()
self.message_handler = hs.get_message_handler()
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
async def on_GET(self, request, room_id):
# TODO support Pagination stream API (limit/tokens)
@@ -465,7 +466,7 @@ class RoomMemberListRestServlet(RestServlet):
if at_token_string is None:
at_token = None
else:
- at_token = StreamToken.from_string(at_token_string)
+ at_token = await StreamToken.from_string(self.store, at_token_string)
# let you filter down on particular memberships.
# XXX: this may not be the best shape for this API - we could pass in a filter
@@ -521,10 +522,13 @@ class RoomMessageListRestServlet(RestServlet):
super().__init__()
self.pagination_handler = hs.get_pagination_handler()
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
- pagination_config = PaginationConfig.from_request(request, default_limit=10)
+ pagination_config = await PaginationConfig.from_request(
+ self.store, request, default_limit=10
+ )
as_client_event = b"raw" not in request.args
filter_str = parse_string(request, b"filter", encoding="utf-8")
if filter_str:
@@ -580,10 +584,11 @@ class RoomInitialSyncRestServlet(RestServlet):
super().__init__()
self.initial_sync_handler = hs.get_initial_sync_handler()
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
async def on_GET(self, request, room_id):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
- pagination_config = PaginationConfig.from_request(request)
+ pagination_config = await PaginationConfig.from_request(self.store, request)
content = await self.initial_sync_handler.room_initial_sync(
room_id=room_id, requester=requester, pagin_config=pagination_config
)
diff --git a/synapse/rest/client/v2_alpha/keys.py b/synapse/rest/client/v2_alpha/keys.py
index 7abd6ff333..55c4606569 100644
--- a/synapse/rest/client/v2_alpha/keys.py
+++ b/synapse/rest/client/v2_alpha/keys.py
@@ -180,6 +180,7 @@ class KeyChangesServlet(RestServlet):
super().__init__()
self.auth = hs.get_auth()
self.device_handler = hs.get_device_handler()
+ self.store = hs.get_datastore()
async def on_GET(self, request):
requester = await self.auth.get_user_by_req(request, allow_guest=True)
@@ -191,7 +192,7 @@ class KeyChangesServlet(RestServlet):
# changes after the "to" as well as before.
set_tag("to", parse_string(request, "to"))
- from_token = StreamToken.from_string(from_token_string)
+ from_token = await StreamToken.from_string(self.store, from_token_string)
user_id = requester.user.to_string()
diff --git a/synapse/rest/client/v2_alpha/sync.py b/synapse/rest/client/v2_alpha/sync.py
index 51e395cc64..6779df952f 100644
--- a/synapse/rest/client/v2_alpha/sync.py
+++ b/synapse/rest/client/v2_alpha/sync.py
@@ -77,6 +77,7 @@ class SyncRestServlet(RestServlet):
super().__init__()
self.hs = hs
self.auth = hs.get_auth()
+ self.store = hs.get_datastore()
self.sync_handler = hs.get_sync_handler()
self.clock = hs.get_clock()
self.filtering = hs.get_filtering()
@@ -151,10 +152,9 @@ class SyncRestServlet(RestServlet):
device_id=device_id,
)
+ since_token = None
if since is not None:
- since_token = StreamToken.from_string(since)
- else:
- since_token = None
+ since_token = await StreamToken.from_string(self.store, since)
# send any outstanding server notices to the user.
await self._server_notices_sender.on_user_syncing(user.to_string())
@@ -236,7 +236,7 @@ class SyncRestServlet(RestServlet):
"leave": sync_result.groups.leave,
},
"device_one_time_keys_count": sync_result.device_one_time_keys_count,
- "next_batch": sync_result.next_batch.to_string(),
+ "next_batch": await sync_result.next_batch.to_string(self.store),
}
@staticmethod
@@ -413,7 +413,7 @@ class SyncRestServlet(RestServlet):
result = {
"timeline": {
"events": serialized_timeline,
- "prev_batch": room.timeline.prev_batch.to_string(),
+ "prev_batch": await room.timeline.prev_batch.to_string(self.store),
"limited": room.timeline.limited,
},
"state": {"events": serialized_state},
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5a5ea39e01..31082bb16a 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -13,42 +13,46 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import heapq
import logging
-from collections import namedtuple
+from collections import defaultdict, namedtuple
from typing import (
+ Any,
Awaitable,
+ Callable,
+ DefaultDict,
Dict,
Iterable,
List,
Optional,
Sequence,
Set,
+ Tuple,
Union,
overload,
)
import attr
from frozendict import frozendict
-from prometheus_client import Histogram
+from prometheus_client import Counter, Histogram
from typing_extensions import Literal
from synapse.api.constants import EventTypes
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions
from synapse.events import EventBase
from synapse.events.snapshot import EventContext
+from synapse.logging.context import ContextResourceUsage
from synapse.logging.utils import log_function
from synapse.state import v1, v2
from synapse.storage.databases.main.events_worker import EventRedactBehaviour
from synapse.storage.roommember import ProfileInfo
from synapse.types import Collection, StateMap
-from synapse.util import Clock
from synapse.util.async_helpers import Linearizer
from synapse.util.caches.expiringcache import ExpiringCache
from synapse.util.metrics import Measure, measure_func
logger = logging.getLogger(__name__)
-
+metrics_logger = logging.getLogger("synapse.state.metrics")
# Metrics for number of state groups involved in a resolution.
state_groups_histogram = Histogram(
@@ -448,19 +452,44 @@ class StateHandler:
state_map = {ev.event_id: ev for st in state_sets for ev in st}
- with Measure(self.clock, "state._resolve_events"):
- new_state = await resolve_events_with_store(
- self.clock,
- event.room_id,
- room_version,
- state_set_ids,
- event_map=state_map,
- state_res_store=StateResolutionStore(self.store),
- )
+ new_state = await self._state_resolution_handler.resolve_events_with_store(
+ event.room_id,
+ room_version,
+ state_set_ids,
+ event_map=state_map,
+ state_res_store=StateResolutionStore(self.store),
+ )
return {key: state_map[ev_id] for key, ev_id in new_state.items()}
+@attr.s(slots=True)
+class _StateResMetrics:
+ """Keeps track of some usage metrics about state res."""
+
+ # System and User CPU time, in seconds
+ cpu_time = attr.ib(type=float, default=0.0)
+
+ # time spent on database transactions (excluding scheduling time). This roughly
+ # corresponds to the amount of work done on the db server, excluding event fetches.
+ db_time = attr.ib(type=float, default=0.0)
+
+ # number of events fetched from the db.
+ db_events = attr.ib(type=int, default=0)
+
+
+_biggest_room_by_cpu_counter = Counter(
+ "synapse_state_res_cpu_for_biggest_room_seconds",
+ "CPU time spent performing state resolution for the single most expensive "
+ "room for state resolution",
+)
+_biggest_room_by_db_counter = Counter(
+ "synapse_state_res_db_for_biggest_room_seconds",
+ "Database time spent performing state resolution for the single most "
+ "expensive room for state resolution",
+)
+
+
class StateResolutionHandler:
"""Responsible for doing state conflict resolution.
@@ -483,6 +512,17 @@ class StateResolutionHandler:
reset_expiry_on_get=True,
)
+ #
+ # stuff for tracking time spent on state-res by room
+ #
+
+ # tracks the amount of work done on state res per room
+ self._state_res_metrics = defaultdict(
+ _StateResMetrics
+ ) # type: DefaultDict[str, _StateResMetrics]
+
+ self.clock.looping_call(self._report_metrics, 120 * 1000)
+
@log_function
async def resolve_state_groups(
self,
@@ -530,15 +570,13 @@ class StateResolutionHandler:
state_groups_histogram.observe(len(state_groups_ids))
- with Measure(self.clock, "state._resolve_events"):
- new_state = await resolve_events_with_store(
- self.clock,
- room_id,
- room_version,
- list(state_groups_ids.values()),
- event_map=event_map,
- state_res_store=state_res_store,
- )
+ new_state = await self.resolve_events_with_store(
+ room_id,
+ room_version,
+ list(state_groups_ids.values()),
+ event_map=event_map,
+ state_res_store=state_res_store,
+ )
# if the new state matches any of the input state groups, we can
# use that state group again. Otherwise we will generate a state_id
@@ -552,6 +590,114 @@ class StateResolutionHandler:
return cache
+ async def resolve_events_with_store(
+ self,
+ room_id: str,
+ room_version: str,
+ state_sets: Sequence[StateMap[str]],
+ event_map: Optional[Dict[str, EventBase]],
+ state_res_store: "StateResolutionStore",
+ ) -> StateMap[str]:
+ """
+ Args:
+ room_id: the room we are working in
+
+ room_version: Version of the room
+
+ state_sets: List of dicts of (type, state_key) -> event_id,
+ which are the different state groups to resolve.
+
+ event_map:
+ a dict from event_id to event, for any events that we happen to
+ have in flight (eg, those currently being persisted). This will be
+ used as a starting point fof finding the state we need; any missing
+ events will be requested via state_map_factory.
+
+ If None, all events will be fetched via state_res_store.
+
+ state_res_store: a place to fetch events from
+
+ Returns:
+ a map from (type, state_key) to event_id.
+ """
+ try:
+ with Measure(self.clock, "state._resolve_events") as m:
+ v = KNOWN_ROOM_VERSIONS[room_version]
+ if v.state_res == StateResolutionVersions.V1:
+ return await v1.resolve_events_with_store(
+ room_id, state_sets, event_map, state_res_store.get_events
+ )
+ else:
+ return await v2.resolve_events_with_store(
+ self.clock,
+ room_id,
+ room_version,
+ state_sets,
+ event_map,
+ state_res_store,
+ )
+ finally:
+ self._record_state_res_metrics(room_id, m.get_resource_usage())
+
+ def _record_state_res_metrics(self, room_id: str, rusage: ContextResourceUsage):
+ room_metrics = self._state_res_metrics[room_id]
+ room_metrics.cpu_time += rusage.ru_utime + rusage.ru_stime
+ room_metrics.db_time += rusage.db_txn_duration_sec
+ room_metrics.db_events += rusage.evt_db_fetch_count
+
+ def _report_metrics(self):
+ if not self._state_res_metrics:
+ # no state res has happened since the last iteration: don't bother logging.
+ return
+
+ self._report_biggest(
+ lambda i: i.cpu_time, "CPU time", _biggest_room_by_cpu_counter,
+ )
+
+ self._report_biggest(
+ lambda i: i.db_time, "DB time", _biggest_room_by_db_counter,
+ )
+
+ self._state_res_metrics.clear()
+
+ def _report_biggest(
+ self,
+ extract_key: Callable[[_StateResMetrics], Any],
+ metric_name: str,
+ prometheus_counter_metric: Counter,
+ ) -> None:
+ """Report metrics on the biggest rooms for state res
+
+ Args:
+ extract_key: a callable which, given a _StateResMetrics, extracts a single
+ metric to sort by.
+ metric_name: the name of the metric we have extracted, for the log line
+ prometheus_counter_metric: a prometheus metric recording the sum of the
+ the extracted metric
+ """
+ n_to_log = 10
+ if not metrics_logger.isEnabledFor(logging.DEBUG):
+ # only need the most expensive if we don't have debug logging, which
+ # allows nlargest() to degrade to max()
+ n_to_log = 1
+
+ items = self._state_res_metrics.items()
+
+ # log the N biggest rooms
+ biggest = heapq.nlargest(
+ n_to_log, items, key=lambda i: extract_key(i[1])
+ ) # type: List[Tuple[str, _StateResMetrics]]
+ metrics_logger.debug(
+ "%i biggest rooms for state-res by %s: %s",
+ len(biggest),
+ metric_name,
+ ["%s (%gs)" % (r, extract_key(m)) for (r, m) in biggest],
+ )
+
+ # report info on the single biggest to prometheus
+ _, biggest_metrics = biggest[0]
+ prometheus_counter_metric.inc(extract_key(biggest_metrics))
+
def _make_state_cache_entry(
new_state: StateMap[str], state_groups_ids: Dict[int, StateMap[str]]
@@ -605,47 +751,6 @@ def _make_state_cache_entry(
)
-def resolve_events_with_store(
- clock: Clock,
- room_id: str,
- room_version: str,
- state_sets: Sequence[StateMap[str]],
- event_map: Optional[Dict[str, EventBase]],
- state_res_store: "StateResolutionStore",
-) -> Awaitable[StateMap[str]]:
- """
- Args:
- room_id: the room we are working in
-
- room_version: Version of the room
-
- state_sets: List of dicts of (type, state_key) -> event_id,
- which are the different state groups to resolve.
-
- event_map:
- a dict from event_id to event, for any events that we happen to
- have in flight (eg, those currently being persisted). This will be
- used as a starting point fof finding the state we need; any missing
- events will be requested via state_map_factory.
-
- If None, all events will be fetched via state_res_store.
-
- state_res_store: a place to fetch events from
-
- Returns:
- a map from (type, state_key) to event_id.
- """
- v = KNOWN_ROOM_VERSIONS[room_version]
- if v.state_res == StateResolutionVersions.V1:
- return v1.resolve_events_with_store(
- room_id, state_sets, event_map, state_res_store.get_events
- )
- else:
- return v2.resolve_events_with_store(
- clock, room_id, room_version, state_sets, event_map, state_res_store
- )
-
-
@attr.s(slots=True)
class StateResolutionStore:
"""Interface that allows state resolution algorithms to access the database
diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py
index 686052bd83..92099f95ce 100644
--- a/synapse/storage/databases/main/metrics.py
+++ b/synapse/storage/databases/main/metrics.py
@@ -12,10 +12,8 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import typing
-from collections import Counter
-from synapse.metrics import BucketCollector
+from synapse.metrics import GaugeBucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import DatabasePool
@@ -23,6 +21,26 @@ from synapse.storage.databases.main.event_push_actions import (
EventPushActionsWorkerStore,
)
+# Collect metrics on the number of forward extremities that exist.
+_extremities_collecter = GaugeBucketCollector(
+ "synapse_forward_extremities",
+ "Number of rooms on the server with the given number of forward extremities"
+ " or fewer",
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500],
+)
+
+# we also expose metrics on the "number of excess extremity events", which is
+# (E-1)*N, where E is the number of extremities and N is the number of state
+# events in the room. This is an approximation to the number of state events
+# we could remove from state resolution by reducing the graph to a single
+# forward extremity.
+_excess_state_events_collecter = GaugeBucketCollector(
+ "synapse_excess_extremity_events",
+ "Number of rooms on the server with the given number of excess extremity "
+ "events, or fewer",
+ buckets=[0] + [1 << n for n in range(12)],
+)
+
class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
"""Functions to pull various metrics from the DB, for e.g. phone home
@@ -32,18 +50,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def __init__(self, database: DatabasePool, db_conn, hs):
super().__init__(database, db_conn, hs)
- # Collect metrics on the number of forward extremities that exist.
- # Counter of number of extremities to count
- self._current_forward_extremities_amount = (
- Counter()
- ) # type: typing.Counter[int]
-
- BucketCollector(
- "synapse_forward_extremities",
- lambda: self._current_forward_extremities_amount,
- buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
- )
-
# Read the extrems every 60 minutes
def read_forward_extremities():
# run as a background process to make sure that the database transactions
@@ -58,14 +64,25 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore):
def fetch(txn):
txn.execute(
"""
- select count(*) c from event_forward_extremities
- group by room_id
+ SELECT t1.c, t2.c
+ FROM (
+ SELECT room_id, COUNT(*) c FROM event_forward_extremities
+ GROUP BY room_id
+ ) t1 LEFT JOIN (
+ SELECT room_id, COUNT(*) c FROM current_state_events
+ GROUP BY room_id
+ ) t2 ON t1.room_id = t2.room_id
"""
)
return txn.fetchall()
res = await self.db_pool.runInteraction("read_forward_extremities", fetch)
- self._current_forward_extremities_amount = Counter([x[0] for x in res])
+
+ _extremities_collecter.update_data(x[0] for x in res)
+
+ _excess_state_events_collecter.update_data(
+ (x[0] - 1) * x[1] for x in res if x[1]
+ )
async def count_daily_messages(self):
"""
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index d7a03cbf7d..ecfc6717b3 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -42,17 +42,17 @@ class PurgeEventsStore(StateGroupWorkerStore, SQLBaseStore):
The set of state groups that are referenced by deleted events.
"""
+ parsed_token = await RoomStreamToken.parse(self, token)
+
return await self.db_pool.runInteraction(
"purge_history",
self._purge_history_txn,
room_id,
- token,
+ parsed_token,
delete_local_events,
)
- def _purge_history_txn(self, txn, room_id, token_str, delete_local_events):
- token = RoomStreamToken.parse(token_str)
-
+ def _purge_history_txn(self, txn, room_id, token, delete_local_events):
# Tables that should be pruned:
# event_auth
# event_backward_extremities
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 92e96468b4..37249f1e3f 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -35,7 +35,6 @@ what sort order was used:
- topological tokems: "t%d-%d", where the integers map to the topological
and stream ordering columns respectively.
"""
-
import abc
import logging
from collections import namedtuple
@@ -54,7 +53,7 @@ from synapse.storage.database import (
)
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-from synapse.types import Collection, RoomStreamToken
+from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
from synapse.util.caches.stream_change_cache import StreamChangeCache
if TYPE_CHECKING:
@@ -305,6 +304,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
def get_room_min_stream_ordering(self) -> int:
raise NotImplementedError()
+ def get_room_max_token(self) -> RoomStreamToken:
+ return RoomStreamToken(None, self.get_room_max_stream_ordering())
+
async def get_room_events_stream_for_rooms(
self,
room_ids: Collection[str],
@@ -611,26 +613,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
allow_none=allow_none,
)
- async def get_stream_token_for_event(self, event_id: str) -> RoomStreamToken:
- """The stream token for an event
- Args:
- event_id: The id of the event to look up a stream token for.
- Raises:
- StoreError if the event wasn't in the database.
- Returns:
- A stream token.
+ async def get_position_for_event(self, event_id: str) -> PersistedEventPosition:
+ """Get the persisted position for an event
"""
- stream_id = await self.get_stream_id_for_event(event_id)
- return RoomStreamToken(None, stream_id)
+ row = await self.db_pool.simple_select_one(
+ table="events",
+ keyvalues={"event_id": event_id},
+ retcols=("stream_ordering", "instance_name"),
+ desc="get_position_for_event",
+ )
+
+ return PersistedEventPosition(
+ row["instance_name"] or "master", row["stream_ordering"]
+ )
- async def get_topological_token_for_event(self, event_id: str) -> str:
+ async def get_topological_token_for_event(self, event_id: str) -> RoomStreamToken:
"""The stream token for an event
Args:
event_id: The id of the event to look up a stream token for.
Raises:
StoreError if the event wasn't in the database.
Returns:
- A "t%d-%d" topological token.
+ A `RoomStreamToken` topological token.
"""
row = await self.db_pool.simple_select_one(
table="events",
@@ -638,7 +642,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
retcols=("stream_ordering", "topological_ordering"),
desc="get_topological_token_for_event",
)
- return "t%d-%d" % (row["topological_ordering"], row["stream_ordering"])
+ return RoomStreamToken(row["topological_ordering"], row["stream_ordering"])
async def get_current_topological_token(self, room_id: str, stream_key: int) -> int:
"""Gets the topological token in a room after or at the given stream
@@ -687,8 +691,8 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore, metaclass=abc.ABCMeta):
else:
topo = None
internal = event.internal_metadata
- internal.before = str(RoomStreamToken(topo, stream - 1))
- internal.after = str(RoomStreamToken(topo, stream))
+ internal.before = RoomStreamToken(topo, stream - 1)
+ internal.after = RoomStreamToken(topo, stream)
internal.order = (int(topo) if topo else 0, int(stream))
async def get_events_around(
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index ded6cf9655..72939f3984 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -229,7 +229,7 @@ class EventsPersistenceStorage:
defer.gatherResults(deferreds, consumeErrors=True)
)
- return RoomStreamToken(None, self.main_store.get_current_events_token())
+ return self.main_store.get_room_max_token()
async def persist_event(
self, event: EventBase, context: EventContext, backfilled: bool = False
@@ -247,11 +247,10 @@ class EventsPersistenceStorage:
await make_deferred_yieldable(deferred)
- max_persisted_id = self.main_store.get_current_events_token()
event_stream_id = event.internal_metadata.stream_ordering
pos = PersistedEventPosition(self._instance_name, event_stream_id)
- return pos, RoomStreamToken(None, max_persisted_id)
+ return pos, self.main_store.get_room_max_token()
def _maybe_start_persisting(self, room_id: str):
async def persisting_queue(item):
diff --git a/synapse/streams/config.py b/synapse/streams/config.py
index 0bdf846edf..fdda21d165 100644
--- a/synapse/streams/config.py
+++ b/synapse/streams/config.py
@@ -12,7 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import logging
from typing import Optional
@@ -21,6 +20,7 @@ import attr
from synapse.api.errors import SynapseError
from synapse.http.servlet import parse_integer, parse_string
from synapse.http.site import SynapseRequest
+from synapse.storage.databases.main import DataStore
from synapse.types import StreamToken
logger = logging.getLogger(__name__)
@@ -39,8 +39,9 @@ class PaginationConfig:
limit = attr.ib(type=Optional[int])
@classmethod
- def from_request(
+ async def from_request(
cls,
+ store: "DataStore",
request: SynapseRequest,
raise_invalid_params: bool = True,
default_limit: Optional[int] = None,
@@ -54,13 +55,13 @@ class PaginationConfig:
if from_tok == "END":
from_tok = None # For backwards compat.
elif from_tok:
- from_tok = StreamToken.from_string(from_tok)
+ from_tok = await StreamToken.from_string(store, from_tok)
except Exception:
raise SynapseError(400, "'from' parameter is invalid")
try:
if to_tok:
- to_tok = StreamToken.from_string(to_tok)
+ to_tok = await StreamToken.from_string(store, to_tok)
except Exception:
raise SynapseError(400, "'to' parameter is invalid")
diff --git a/synapse/types.py b/synapse/types.py
index 07b421077c..bc73e3775d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -18,7 +18,17 @@ import re
import string
import sys
from collections import namedtuple
-from typing import Any, Dict, Mapping, MutableMapping, Optional, Tuple, Type, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Dict,
+ Mapping,
+ MutableMapping,
+ Optional,
+ Tuple,
+ Type,
+ TypeVar,
+)
import attr
from signedjson.key import decode_verify_key_bytes
@@ -27,6 +37,9 @@ from unpaddedbase64 import decode_base64
from synapse.api.errors import Codes, SynapseError
+if TYPE_CHECKING:
+ from synapse.storage.databases.main import DataStore
+
# define a version of typing.Collection that works on python 3.5
if sys.version_info[:3] >= (3, 6, 0):
from typing import Collection
@@ -407,7 +420,7 @@ class RoomStreamToken:
stream = attr.ib(type=int, validator=attr.validators.instance_of(int))
@classmethod
- def parse(cls, string: str) -> "RoomStreamToken":
+ async def parse(cls, store: "DataStore", string: str) -> "RoomStreamToken":
try:
if string[0] == "s":
return cls(topological=None, stream=int(string[1:]))
@@ -427,10 +440,22 @@ class RoomStreamToken:
pass
raise SynapseError(400, "Invalid token %r" % (string,))
+ def copy_and_advance(self, other: "RoomStreamToken") -> "RoomStreamToken":
+ """Return a new token such that if an event is after both this token and
+ the other token, then its after the returned token too.
+ """
+
+ if self.topological or other.topological:
+ raise Exception("Can't advance topological tokens")
+
+ max_stream = max(self.stream, other.stream)
+
+ return RoomStreamToken(None, max_stream)
+
def as_tuple(self) -> Tuple[Optional[int], int]:
return (self.topological, self.stream)
- def __str__(self) -> str:
+ async def to_string(self, store: "DataStore") -> str:
if self.topological is not None:
return "t%d-%d" % (self.topological, self.stream)
else:
@@ -455,48 +480,51 @@ class StreamToken:
START = None # type: StreamToken
@classmethod
- def from_string(cls, string):
+ async def from_string(cls, store: "DataStore", string: str) -> "StreamToken":
try:
keys = string.split(cls._SEPARATOR)
while len(keys) < len(attr.fields(cls)):
# i.e. old token from before receipt_key
keys.append("0")
- return cls(RoomStreamToken.parse(keys[0]), *(int(k) for k in keys[1:]))
+ return cls(
+ await RoomStreamToken.parse(store, keys[0]), *(int(k) for k in keys[1:])
+ )
except Exception:
raise SynapseError(400, "Invalid Token")
- def to_string(self):
- return self._SEPARATOR.join([str(k) for k in attr.astuple(self, recurse=False)])
+ async def to_string(self, store: "DataStore") -> str:
+ return self._SEPARATOR.join(
+ [
+ await self.room_key.to_string(store),
+ str(self.presence_key),
+ str(self.typing_key),
+ str(self.receipt_key),
+ str(self.account_data_key),
+ str(self.push_rules_key),
+ str(self.to_device_key),
+ str(self.device_list_key),
+ str(self.groups_key),
+ ]
+ )
@property
def room_stream_id(self):
return self.room_key.stream
- def is_after(self, other):
- """Does this token contain events that the other doesn't?"""
- return (
- (other.room_stream_id < self.room_stream_id)
- or (int(other.presence_key) < int(self.presence_key))
- or (int(other.typing_key) < int(self.typing_key))
- or (int(other.receipt_key) < int(self.receipt_key))
- or (int(other.account_data_key) < int(self.account_data_key))
- or (int(other.push_rules_key) < int(self.push_rules_key))
- or (int(other.to_device_key) < int(self.to_device_key))
- or (int(other.device_list_key) < int(self.device_list_key))
- or (int(other.groups_key) < int(self.groups_key))
- )
-
def copy_and_advance(self, key, new_value) -> "StreamToken":
"""Advance the given key in the token to a new value if and only if the
new value is after the old value.
"""
- new_token = self.copy_and_replace(key, new_value)
if key == "room_key":
- new_id = new_token.room_stream_id
- old_id = self.room_stream_id
- else:
- new_id = int(getattr(new_token, key))
- old_id = int(getattr(self, key))
+ new_token = self.copy_and_replace(
+ "room_key", self.room_key.copy_and_advance(new_value)
+ )
+ return new_token
+
+ new_token = self.copy_and_replace(key, new_value)
+ new_id = int(getattr(new_token, key))
+ old_id = int(getattr(self, key))
+
if old_id < new_id:
return new_token
else:
@@ -506,7 +534,7 @@ class StreamToken:
return attr.evolve(self, **{key: new_value})
-StreamToken.START = StreamToken.from_string("s0_0")
+StreamToken.START = StreamToken(RoomStreamToken(None, 0), 0, 0, 0, 0, 0, 0, 0, 0)
@attr.s(slots=True, frozen=True)
@@ -523,6 +551,18 @@ class PersistedEventPosition:
def persisted_after(self, token: RoomStreamToken) -> bool:
return token.stream < self.stream
+ def to_room_stream_token(self) -> RoomStreamToken:
+ """Converts the position to a room stream token such that events
+ persisted in the same room after this position will be after the
+ returned `RoomStreamToken`.
+
+ Note: no guarentees are made about ordering w.r.t. events in other
+ rooms.
+ """
+ # Doing the naive thing satisfies the desired properties described in
+ # the docstring.
+ return RoomStreamToken(None, self.stream)
+
class ThirdPartyInstanceID(
namedtuple("ThirdPartyInstanceID", ("appservice_id", "network_id"))
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index 6e57c1ee72..ffdea0de8d 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -19,7 +19,11 @@ from typing import Any, Callable, Optional, TypeVar, cast
from prometheus_client import Counter
-from synapse.logging.context import LoggingContext, current_context
+from synapse.logging.context import (
+ ContextResourceUsage,
+ LoggingContext,
+ current_context,
+)
from synapse.metrics import InFlightGauge
logger = logging.getLogger(__name__)
@@ -104,27 +108,27 @@ class Measure:
def __init__(self, clock, name):
self.clock = clock
self.name = name
- self._logging_context = None
+ parent_context = current_context()
+ self._logging_context = LoggingContext(
+ "Measure[%s]" % (self.name,), parent_context
+ )
self.start = None
- def __enter__(self):
- if self._logging_context:
+ def __enter__(self) -> "Measure":
+ if self.start is not None:
raise RuntimeError("Measure() objects cannot be re-used")
self.start = self.clock.time()
- parent_context = current_context()
- self._logging_context = LoggingContext(
- "Measure[%s]" % (self.name,), parent_context
- )
self._logging_context.__enter__()
in_flight.register((self.name,), self._update_in_flight)
+ return self
def __exit__(self, exc_type, exc_val, exc_tb):
- if not self._logging_context:
+ if self.start is None:
raise RuntimeError("Measure() block exited without being entered")
duration = self.clock.time() - self.start
- usage = self._logging_context.get_resource_usage()
+ usage = self.get_resource_usage()
in_flight.unregister((self.name,), self._update_in_flight)
self._logging_context.__exit__(exc_type, exc_val, exc_tb)
@@ -140,6 +144,13 @@ class Measure:
except ValueError:
logger.warning("Failed to save metrics! Usage: %s", usage)
+ def get_resource_usage(self) -> ContextResourceUsage:
+ """Get the resources used within this Measure block
+
+ If the Measure block is still active, returns the resource usage so far.
+ """
+ return self._logging_context.get_resource_usage()
+
def _update_in_flight(self, metrics):
"""Gets called when processing in flight metrics
"""
diff --git a/tests/handlers/test_oidc.py b/tests/handlers/test_oidc.py
index 5910772aa8..d5087e58be 100644
--- a/tests/handlers/test_oidc.py
+++ b/tests/handlers/test_oidc.py
@@ -21,7 +21,6 @@ from mock import Mock, patch
import attr
import pymacaroons
-from twisted.internet import defer
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
@@ -87,6 +86,13 @@ class TestMappingProvider(OidcMappingProvider):
async def map_user_attributes(self, userinfo, token):
return {"localpart": userinfo["username"], "display_name": None}
+ # Do not include get_extra_attributes to test backwards compatibility paths.
+
+
+class TestMappingProviderExtra(TestMappingProvider):
+ async def get_extra_attributes(self, userinfo, token):
+ return {"phone": userinfo["phone"]}
+
def simple_async_mock(return_value=None, raises=None):
# AsyncMock is not available in python3.5, this mimics part of its behaviour
@@ -126,7 +132,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
config = self.default_config()
config["public_baseurl"] = BASE_URL
- oidc_config = config.get("oidc_config", {})
+ oidc_config = {}
oidc_config["enabled"] = True
oidc_config["client_id"] = CLIENT_ID
oidc_config["client_secret"] = CLIENT_SECRET
@@ -135,6 +141,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
oidc_config["user_mapping_provider"] = {
"module": __name__ + ".TestMappingProvider",
}
+
+ # Update this config with what's in the default config so that
+ # override_config works as expected.
+ oidc_config.update(config.get("oidc_config", {}))
config["oidc_config"] = oidc_config
hs = self.setup_test_homeserver(
@@ -165,11 +175,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.assertEqual(self.handler._client_auth.client_secret, CLIENT_SECRET)
@override_config({"oidc_config": {"discover": True}})
- @defer.inlineCallbacks
def test_discovery(self):
"""The handler should discover the endpoints from OIDC discovery document."""
# This would throw if some metadata were invalid
- metadata = yield defer.ensureDeferred(self.handler.load_metadata())
+ metadata = self.get_success(self.handler.load_metadata())
self.http_client.get_json.assert_called_once_with(WELL_KNOWN)
self.assertEqual(metadata.issuer, ISSUER)
@@ -181,43 +190,40 @@ class OidcHandlerTestCase(HomeserverTestCase):
# subsequent calls should be cached
self.http_client.reset_mock()
- yield defer.ensureDeferred(self.handler.load_metadata())
+ self.get_success(self.handler.load_metadata())
self.http_client.get_json.assert_not_called()
@override_config({"oidc_config": COMMON_CONFIG})
- @defer.inlineCallbacks
def test_no_discovery(self):
"""When discovery is disabled, it should not try to load from discovery document."""
- yield defer.ensureDeferred(self.handler.load_metadata())
+ self.get_success(self.handler.load_metadata())
self.http_client.get_json.assert_not_called()
@override_config({"oidc_config": COMMON_CONFIG})
- @defer.inlineCallbacks
def test_load_jwks(self):
"""JWKS loading is done once (then cached) if used."""
- jwks = yield defer.ensureDeferred(self.handler.load_jwks())
+ jwks = self.get_success(self.handler.load_jwks())
self.http_client.get_json.assert_called_once_with(JWKS_URI)
self.assertEqual(jwks, {"keys": []})
# subsequent calls should be cached…
self.http_client.reset_mock()
- yield defer.ensureDeferred(self.handler.load_jwks())
+ self.get_success(self.handler.load_jwks())
self.http_client.get_json.assert_not_called()
# …unless forced
self.http_client.reset_mock()
- yield defer.ensureDeferred(self.handler.load_jwks(force=True))
+ self.get_success(self.handler.load_jwks(force=True))
self.http_client.get_json.assert_called_once_with(JWKS_URI)
# Throw if the JWKS uri is missing
with self.metadata_edit({"jwks_uri": None}):
- with self.assertRaises(RuntimeError):
- yield defer.ensureDeferred(self.handler.load_jwks(force=True))
+ self.get_failure(self.handler.load_jwks(force=True), RuntimeError)
# Return empty key set if JWKS are not used
self.handler._scopes = [] # not asking the openid scope
self.http_client.get_json.reset_mock()
- jwks = yield defer.ensureDeferred(self.handler.load_jwks(force=True))
+ jwks = self.get_success(self.handler.load_jwks(force=True))
self.http_client.get_json.assert_not_called()
self.assertEqual(jwks, {"keys": []})
@@ -299,11 +305,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
# This should not throw
self.handler._validate_metadata()
- @defer.inlineCallbacks
def test_redirect_request(self):
"""The redirect request has the right arguments & generates a valid session cookie."""
req = Mock(spec=["addCookie"])
- url = yield defer.ensureDeferred(
+ url = self.get_success(
self.handler.handle_redirect_request(req, b"http://client/redirect")
)
url = urlparse(url)
@@ -343,20 +348,18 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.assertEqual(params["nonce"], [nonce])
self.assertEqual(redirect, "http://client/redirect")
- @defer.inlineCallbacks
def test_callback_error(self):
"""Errors from the provider returned in the callback are displayed."""
self.handler._render_error = Mock()
request = Mock(args={})
request.args[b"error"] = [b"invalid_client"]
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_client", "")
request.args[b"error_description"] = [b"some description"]
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_client", "some description")
- @defer.inlineCallbacks
def test_callback(self):
"""Code callback works and display errors if something went wrong.
@@ -377,7 +380,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
"sub": "foo",
"preferred_username": "bar",
}
- user_id = UserID("foo", "domain.org")
+ user_id = "@foo:domain.org"
self.handler._render_error = Mock(return_value=None)
self.handler._exchange_code = simple_async_mock(return_value=token)
self.handler._parse_id_token = simple_async_mock(return_value=userinfo)
@@ -394,13 +397,12 @@ class OidcHandlerTestCase(HomeserverTestCase):
client_redirect_url = "http://client/redirect"
user_agent = "Browser"
ip_address = "10.0.0.1"
- session = self.handler._generate_oidc_session_token(
+ request.getCookie.return_value = self.handler._generate_oidc_session_token(
state=state,
nonce=nonce,
client_redirect_url=client_redirect_url,
ui_auth_session_id=None,
)
- request.getCookie.return_value = session
request.args = {}
request.args[b"code"] = [code.encode("utf-8")]
@@ -410,10 +412,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
request.requestHeaders.getRawHeaders.return_value = [user_agent.encode("ascii")]
request.getClientIP.return_value = ip_address
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.handler._auth_handler.complete_sso_login.assert_called_once_with(
- user_id, request, client_redirect_url,
+ user_id, request, client_redirect_url, {},
)
self.handler._exchange_code.assert_called_once_with(code)
self.handler._parse_id_token.assert_called_once_with(token, nonce=nonce)
@@ -427,13 +429,13 @@ class OidcHandlerTestCase(HomeserverTestCase):
self.handler._map_userinfo_to_user = simple_async_mock(
raises=MappingException()
)
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("mapping_error")
self.handler._map_userinfo_to_user = simple_async_mock(return_value=user_id)
# Handle ID token errors
self.handler._parse_id_token = simple_async_mock(raises=Exception())
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_token")
self.handler._auth_handler.complete_sso_login.reset_mock()
@@ -444,10 +446,10 @@ class OidcHandlerTestCase(HomeserverTestCase):
# With userinfo fetching
self.handler._scopes = [] # do not ask the "openid" scope
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.handler._auth_handler.complete_sso_login.assert_called_once_with(
- user_id, request, client_redirect_url,
+ user_id, request, client_redirect_url, {},
)
self.handler._exchange_code.assert_called_once_with(code)
self.handler._parse_id_token.assert_not_called()
@@ -459,17 +461,16 @@ class OidcHandlerTestCase(HomeserverTestCase):
# Handle userinfo fetching error
self.handler._fetch_userinfo = simple_async_mock(raises=Exception())
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("fetch_error")
# Handle code exchange failure
self.handler._exchange_code = simple_async_mock(
raises=OidcError("invalid_request")
)
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_request")
- @defer.inlineCallbacks
def test_callback_session(self):
"""The callback verifies the session presence and validity"""
self.handler._render_error = Mock(return_value=None)
@@ -478,20 +479,20 @@ class OidcHandlerTestCase(HomeserverTestCase):
# Missing cookie
request.args = {}
request.getCookie.return_value = None
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("missing_session", "No session cookie found")
# Missing session parameter
request.args = {}
request.getCookie.return_value = "session"
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_request", "State parameter is missing")
# Invalid cookie
request.args = {}
request.args[b"state"] = [b"state"]
request.getCookie.return_value = "session"
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_session")
# Mismatching session
@@ -504,18 +505,17 @@ class OidcHandlerTestCase(HomeserverTestCase):
request.args = {}
request.args[b"state"] = [b"mismatching state"]
request.getCookie.return_value = session
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("mismatching_session")
# Valid session
request.args = {}
request.args[b"state"] = [b"state"]
request.getCookie.return_value = session
- yield defer.ensureDeferred(self.handler.handle_oidc_callback(request))
+ self.get_success(self.handler.handle_oidc_callback(request))
self.assertRenderedError("invalid_request")
@override_config({"oidc_config": {"client_auth_method": "client_secret_post"}})
- @defer.inlineCallbacks
def test_exchange_code(self):
"""Code exchange behaves correctly and handles various error scenarios."""
token = {"type": "bearer"}
@@ -524,7 +524,7 @@ class OidcHandlerTestCase(HomeserverTestCase):
return_value=FakeResponse(code=200, phrase=b"OK", body=token_json)
)
code = "code"
- ret = yield defer.ensureDeferred(self.handler._exchange_code(code))
+ ret = self.get_success(self.handler._exchange_code(code))
kwargs = self.http_client.request.call_args[1]
self.assertEqual(ret, token)
@@ -546,10 +546,9 @@ class OidcHandlerTestCase(HomeserverTestCase):
body=b'{"error": "foo", "error_description": "bar"}',
)
)
- with self.assertRaises(OidcError) as exc:
- yield defer.ensureDeferred(self.handler._exchange_code(code))
- self.assertEqual(exc.exception.error, "foo")
- self.assertEqual(exc.exception.error_description, "bar")
+ exc = self.get_failure(self.handler._exchange_code(code), OidcError)
+ self.assertEqual(exc.value.error, "foo")
+ self.assertEqual(exc.value.error_description, "bar")
# Internal server error with no JSON body
self.http_client.request = simple_async_mock(
@@ -557,9 +556,8 @@ class OidcHandlerTestCase(HomeserverTestCase):
code=500, phrase=b"Internal Server Error", body=b"Not JSON",
)
)
- with self.assertRaises(OidcError) as exc:
- yield defer.ensureDeferred(self.handler._exchange_code(code))
- self.assertEqual(exc.exception.error, "server_error")
+ exc = self.get_failure(self.handler._exchange_code(code), OidcError)
+ self.assertEqual(exc.value.error, "server_error")
# Internal server error with JSON body
self.http_client.request = simple_async_mock(
@@ -569,17 +567,16 @@ class OidcHandlerTestCase(HomeserverTestCase):
body=b'{"error": "internal_server_error"}',
)
)
- with self.assertRaises(OidcError) as exc:
- yield defer.ensureDeferred(self.handler._exchange_code(code))
- self.assertEqual(exc.exception.error, "internal_server_error")
+
+ exc = self.get_failure(self.handler._exchange_code(code), OidcError)
+ self.assertEqual(exc.value.error, "internal_server_error")
# 4xx error without "error" field
self.http_client.request = simple_async_mock(
return_value=FakeResponse(code=400, phrase=b"Bad request", body=b"{}",)
)
- with self.assertRaises(OidcError) as exc:
- yield defer.ensureDeferred(self.handler._exchange_code(code))
- self.assertEqual(exc.exception.error, "server_error")
+ exc = self.get_failure(self.handler._exchange_code(code), OidcError)
+ self.assertEqual(exc.value.error, "server_error")
# 2xx error with "error" field
self.http_client.request = simple_async_mock(
@@ -587,9 +584,62 @@ class OidcHandlerTestCase(HomeserverTestCase):
code=200, phrase=b"OK", body=b'{"error": "some_error"}',
)
)
- with self.assertRaises(OidcError) as exc:
- yield defer.ensureDeferred(self.handler._exchange_code(code))
- self.assertEqual(exc.exception.error, "some_error")
+ exc = self.get_failure(self.handler._exchange_code(code), OidcError)
+ self.assertEqual(exc.value.error, "some_error")
+
+ @override_config(
+ {
+ "oidc_config": {
+ "user_mapping_provider": {
+ "module": __name__ + ".TestMappingProviderExtra"
+ }
+ }
+ }
+ )
+ def test_extra_attributes(self):
+ """
+ Login while using a mapping provider that implements get_extra_attributes.
+ """
+ token = {
+ "type": "bearer",
+ "id_token": "id_token",
+ "access_token": "access_token",
+ }
+ userinfo = {
+ "sub": "foo",
+ "phone": "1234567",
+ }
+ user_id = "@foo:domain.org"
+ self.handler._exchange_code = simple_async_mock(return_value=token)
+ self.handler._parse_id_token = simple_async_mock(return_value=userinfo)
+ self.handler._map_userinfo_to_user = simple_async_mock(return_value=user_id)
+ self.handler._auth_handler.complete_sso_login = simple_async_mock()
+ request = Mock(
+ spec=["args", "getCookie", "addCookie", "requestHeaders", "getClientIP"]
+ )
+
+ state = "state"
+ client_redirect_url = "http://client/redirect"
+ request.getCookie.return_value = self.handler._generate_oidc_session_token(
+ state=state,
+ nonce="nonce",
+ client_redirect_url=client_redirect_url,
+ ui_auth_session_id=None,
+ )
+
+ request.args = {}
+ request.args[b"code"] = [b"code"]
+ request.args[b"state"] = [state.encode("utf-8")]
+
+ request.requestHeaders = Mock(spec=["getRawHeaders"])
+ request.requestHeaders.getRawHeaders.return_value = [b"Browser"]
+ request.getClientIP.return_value = "10.0.0.1"
+
+ self.get_success(self.handler.handle_oidc_callback(request))
+
+ self.handler._auth_handler.complete_sso_login.assert_called_once_with(
+ user_id, request, client_redirect_url, {"phone": "1234567"},
+ )
def test_map_userinfo_to_user(self):
"""Ensure that mapping the userinfo returned from a provider to an MXID works properly."""
diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 0a567b032f..0d809d25d5 100644
--- a/tests/rest/client/v1/test_rooms.py
+++ b/tests/rest/client/v1/test_rooms.py
@@ -905,6 +905,7 @@ class RoomMessageListTestCase(RoomBase):
first_token = self.get_success(
store.get_topological_token_for_event(first_event_id)
)
+ first_token_str = self.get_success(first_token.to_string(store))
# Send a second message in the room, which won't be removed, and which we'll
# use as the marker to purge events before.
@@ -912,6 +913,7 @@ class RoomMessageListTestCase(RoomBase):
second_token = self.get_success(
store.get_topological_token_for_event(second_event_id)
)
+ second_token_str = self.get_success(second_token.to_string(store))
# Send a third event in the room to ensure we don't fall under any edge case
# due to our marker being the latest forward extremity in the room.
@@ -921,7 +923,11 @@ class RoomMessageListTestCase(RoomBase):
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
- % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
+ % (
+ self.room_id,
+ second_token_str,
+ json.dumps({"types": [EventTypes.Message]}),
+ ),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -936,7 +942,7 @@ class RoomMessageListTestCase(RoomBase):
pagination_handler._purge_history(
purge_id=purge_id,
room_id=self.room_id,
- token=second_token,
+ token=second_token_str,
delete_local_events=True,
)
)
@@ -946,7 +952,11 @@ class RoomMessageListTestCase(RoomBase):
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
- % (self.room_id, second_token, json.dumps({"types": [EventTypes.Message]})),
+ % (
+ self.room_id,
+ second_token_str,
+ json.dumps({"types": [EventTypes.Message]}),
+ ),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
@@ -960,7 +970,11 @@ class RoomMessageListTestCase(RoomBase):
request, channel = self.make_request(
"GET",
"/rooms/%s/messages?access_token=x&from=%s&dir=b&filter=%s"
- % (self.room_id, first_token, json.dumps({"types": [EventTypes.Message]})),
+ % (
+ self.room_id,
+ first_token_str,
+ json.dumps({"types": [EventTypes.Message]}),
+ ),
)
self.render(request)
self.assertEqual(channel.code, 200, channel.json_body)
diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py
index 949846fe33..3957471f3f 100644
--- a/tests/storage/test_event_metrics.py
+++ b/tests/storage/test_event_metrics.py
@@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
self.reactor.advance(60 * 60 * 1000)
self.pump(1)
- items = set(
+ items = list(
filter(
lambda x: b"synapse_forward_extremities_" in x,
- generate_latest(REGISTRY).split(b"\n"),
+ generate_latest(REGISTRY, emit_help=False).split(b"\n"),
)
)
- expected = {
+ expected = [
b'synapse_forward_extremities_bucket{le="1.0"} 0.0',
b'synapse_forward_extremities_bucket{le="2.0"} 2.0',
b'synapse_forward_extremities_bucket{le="3.0"} 2.0',
@@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase):
b'synapse_forward_extremities_bucket{le="100.0"} 3.0',
b'synapse_forward_extremities_bucket{le="200.0"} 3.0',
b'synapse_forward_extremities_bucket{le="500.0"} 3.0',
- b'synapse_forward_extremities_bucket{le="+Inf"} 3.0',
- b"synapse_forward_extremities_count 3.0",
- b"synapse_forward_extremities_sum 10.0",
- }
-
+ # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9,
+ # "inf" is valid: "this includes variants such as inf"
+ b'synapse_forward_extremities_bucket{le="inf"} 3.0',
+ b"# TYPE synapse_forward_extremities_gcount gauge",
+ b"synapse_forward_extremities_gcount 3.0",
+ b"# TYPE synapse_forward_extremities_gsum gauge",
+ b"synapse_forward_extremities_gsum 10.0",
+ ]
self.assertEqual(items, expected)
diff --git a/tests/storage/test_purge.py b/tests/storage/test_purge.py
index 918387733b..cc1f3c53c5 100644
--- a/tests/storage/test_purge.py
+++ b/tests/storage/test_purge.py
@@ -47,12 +47,15 @@ class PurgeTests(HomeserverTestCase):
storage = self.hs.get_storage()
# Get the topological token
- event = self.get_success(
+ token = self.get_success(
store.get_topological_token_for_event(last["event_id"])
)
+ token_str = self.get_success(token.to_string(self.hs.get_datastore()))
# Purge everything before this topological token
- self.get_success(storage.purge_events.purge_history(self.room_id, event, True))
+ self.get_success(
+ storage.purge_events.purge_history(self.room_id, token_str, True)
+ )
# 1-3 should fail and last will succeed, meaning that 1-3 are deleted
# and last is not.
@@ -74,12 +77,10 @@ class PurgeTests(HomeserverTestCase):
storage = self.hs.get_datastore()
# Set the topological token higher than it should be
- event = self.get_success(
+ token = self.get_success(
storage.get_topological_token_for_event(last["event_id"])
)
- event = "t{}-{}".format(
- *list(map(lambda x: x + 1, map(int, event[1:].split("-"))))
- )
+ event = "t{}-{}".format(token.topological + 1, token.stream + 1)
# Purge everything before this topological token
purge = defer.ensureDeferred(storage.purge_history(self.room_id, event, True))
|