diff --git a/CHANGES.md b/CHANGES.md
index a4c3ce31ae..a299110a6b 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,76 @@
+Synapse 0.33.2 (2018-08-09)
+===========================
+
+No significant changes.
+
+
+Synapse 0.33.2rc1 (2018-08-07)
+==============================
+
+Features
+--------
+
+- add support for the lazy_loaded_members filter as per MSC1227 ([\#2970](https://github.com/matrix-org/synapse/issues/2970))
+- add support for the include_redundant_members filter param as per MSC1227 ([\#3331](https://github.com/matrix-org/synapse/issues/3331))
+- Add metrics to track resource usage by background processes ([\#3553](https://github.com/matrix-org/synapse/issues/3553), [\#3556](https://github.com/matrix-org/synapse/issues/3556), [\#3604](https://github.com/matrix-org/synapse/issues/3604), [\#3610](https://github.com/matrix-org/synapse/issues/3610))
+- Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric ([\#3554](https://github.com/matrix-org/synapse/issues/3554))
+- Add support for client_reader to handle more APIs ([\#3555](https://github.com/matrix-org/synapse/issues/3555), [\#3597](https://github.com/matrix-org/synapse/issues/3597))
+- make the /context API filter & lazy-load aware as per MSC1227 ([\#3567](https://github.com/matrix-org/synapse/issues/3567))
+- Add ability to limit number of monthly active users on the server ([\#3630](https://github.com/matrix-org/synapse/issues/3630))
+- When we fail to join a room over federation, pass the error code back to the client. ([\#3639](https://github.com/matrix-org/synapse/issues/3639))
+- Add a new /admin/register API for non-interactively creating users. ([\#3415](https://github.com/matrix-org/synapse/issues/3415))
+
+
+Bugfixes
+--------
+
+- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952))
+- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391))
+- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514))
+- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520))
+- Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis. ([\#3548](https://github.com/matrix-org/synapse/issues/3548))
+- Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password. ([\#3569](https://github.com/matrix-org/synapse/issues/3569))
+- Fix potential stack overflow and deadlock under heavy load ([\#3570](https://github.com/matrix-org/synapse/issues/3570))
+- Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585 ([\#3585](https://github.com/matrix-org/synapse/issues/3585))
+- Fix failure to persist events over federation under load ([\#3601](https://github.com/matrix-org/synapse/issues/3601))
+- Fix updating of cached remote profiles ([\#3605](https://github.com/matrix-org/synapse/issues/3605))
+- Fix 'tuple index out of range' error ([\#3607](https://github.com/matrix-org/synapse/issues/3607))
+- Only import secrets when available (fix for py < 3.6) ([\#3626](https://github.com/matrix-org/synapse/issues/3626))
+
+
+Internal Changes
+----------------
+
+- Remove redundant checks on who_forgot_in_room ([\#3350](https://github.com/matrix-org/synapse/issues/3350))
+- Remove unnecessary event re-signing hacks ([\#3367](https://github.com/matrix-org/synapse/issues/3367))
+- Rewrite cache list decorator ([\#3384](https://github.com/matrix-org/synapse/issues/3384))
+- Move v1-only REST APIs into their own module. ([\#3460](https://github.com/matrix-org/synapse/issues/3460))
+- Replace more instances of Python 2-only iteritems and itervalues uses. ([\#3562](https://github.com/matrix-org/synapse/issues/3562))
+- Refactor EventContext to accept state during init ([\#3577](https://github.com/matrix-org/synapse/issues/3577))
+- Improve Dockerfile and docker-compose instructions ([\#3543](https://github.com/matrix-org/synapse/issues/3543))
+- Release notes are now in the Markdown format. ([\#3552](https://github.com/matrix-org/synapse/issues/3552))
+- add config for pep8 ([\#3559](https://github.com/matrix-org/synapse/issues/3559))
+- Merge Linearizer and Limiter ([\#3571](https://github.com/matrix-org/synapse/issues/3571), [\#3572](https://github.com/matrix-org/synapse/issues/3572))
+- Lazily load state on master process when using workers to reduce DB consumption ([\#3579](https://github.com/matrix-org/synapse/issues/3579), [\#3581](https://github.com/matrix-org/synapse/issues/3581), [\#3582](https://github.com/matrix-org/synapse/issues/3582), [\#3584](https://github.com/matrix-org/synapse/issues/3584))
+- Fixes and optimisations for resolve_state_groups ([\#3586](https://github.com/matrix-org/synapse/issues/3586))
+- Improve logging for exceptions when handling PDUs ([\#3587](https://github.com/matrix-org/synapse/issues/3587))
+- Add some measure blocks to persist_events ([\#3590](https://github.com/matrix-org/synapse/issues/3590))
+- Fix some random logcontext leaks. ([\#3591](https://github.com/matrix-org/synapse/issues/3591), [\#3606](https://github.com/matrix-org/synapse/issues/3606))
+- Speed up calculating state deltas in persist_event loop ([\#3592](https://github.com/matrix-org/synapse/issues/3592))
+- Attempt to reduce amount of state pulled out of DB during persist_events ([\#3595](https://github.com/matrix-org/synapse/issues/3595))
+- Fix a documentation typo in on_make_leave_request ([\#3609](https://github.com/matrix-org/synapse/issues/3609))
+- Make EventStore inherit from EventFederationStore ([\#3612](https://github.com/matrix-org/synapse/issues/3612))
+- Remove some redundant joins on event_edges.room_id ([\#3613](https://github.com/matrix-org/synapse/issues/3613))
+- Stop populating events.content ([\#3614](https://github.com/matrix-org/synapse/issues/3614))
+- Update the /send_leave path registration to use event_id rather than a transaction ID. ([\#3616](https://github.com/matrix-org/synapse/issues/3616))
+- Refactor FederationHandler to move DB writes into separate functions ([\#3621](https://github.com/matrix-org/synapse/issues/3621))
+- Remove unused field "pdu_failures" from transactions. ([\#3628](https://github.com/matrix-org/synapse/issues/3628))
+- rename replication_layer to federation_client ([\#3634](https://github.com/matrix-org/synapse/issues/3634))
+- Factor out exception handling in federation_client ([\#3638](https://github.com/matrix-org/synapse/issues/3638))
+- Refactor location of docker build script. ([\#3644](https://github.com/matrix-org/synapse/issues/3644))
+- Update CONTRIBUTING to mention newsfragments. ([\#3645](https://github.com/matrix-org/synapse/issues/3645))
+
+
Synapse 0.33.1 (2018-08-02)
===========================
diff --git a/changelog.d/2952.bugfix b/changelog.d/2952.bugfix
deleted file mode 100644
index 07a3e48304..0000000000
--- a/changelog.d/2952.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Make /directory/list API return 404 for room not found instead of 400
diff --git a/changelog.d/2970.feature b/changelog.d/2970.feature
deleted file mode 100644
index 5eb928563f..0000000000
--- a/changelog.d/2970.feature
+++ /dev/null
@@ -1 +0,0 @@
-add support for the lazy_loaded_members filter as per MSC1227
diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature
deleted file mode 100644
index e574b9bcc3..0000000000
--- a/changelog.d/3331.feature
+++ /dev/null
@@ -1 +0,0 @@
-add support for the include_redundant_members filter param as per MSC1227
diff --git a/changelog.d/3350.misc b/changelog.d/3350.misc
deleted file mode 100644
index 3713cd6d63..0000000000
--- a/changelog.d/3350.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove redundant checks on who_forgot_in_room
\ No newline at end of file
diff --git a/changelog.d/3367.misc b/changelog.d/3367.misc
deleted file mode 100644
index 1f21ddea48..0000000000
--- a/changelog.d/3367.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove unnecessary event re-signing hacks
\ No newline at end of file
diff --git a/changelog.d/3384.misc b/changelog.d/3384.misc
deleted file mode 100644
index 5d56c876d9..0000000000
--- a/changelog.d/3384.misc
+++ /dev/null
@@ -1 +0,0 @@
-Rewrite cache list decorator
diff --git a/changelog.d/3391.bugfix b/changelog.d/3391.bugfix
deleted file mode 100644
index 88eeb50df2..0000000000
--- a/changelog.d/3391.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Default inviter_display_name to mxid for email invites
\ No newline at end of file
diff --git a/changelog.d/3415.misc b/changelog.d/3415.misc
deleted file mode 100644
index e69de29bb2..0000000000
--- a/changelog.d/3415.misc
+++ /dev/null
diff --git a/changelog.d/3460.misc b/changelog.d/3460.misc
deleted file mode 100644
index e69de29bb2..0000000000
--- a/changelog.d/3460.misc
+++ /dev/null
diff --git a/changelog.d/3514.bugfix b/changelog.d/3514.bugfix
deleted file mode 100644
index 460fe24ac9..0000000000
--- a/changelog.d/3514.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Don't generate TURN credentials if no TURN config options are set
diff --git a/changelog.d/3520.bugfix b/changelog.d/3520.bugfix
deleted file mode 100644
index 9278cb3708..0000000000
--- a/changelog.d/3520.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Correctly announce deleted devices over federation
diff --git a/changelog.d/3543.misc b/changelog.d/3543.misc
deleted file mode 100644
index d231d17749..0000000000
--- a/changelog.d/3543.misc
+++ /dev/null
@@ -1 +0,0 @@
-Improve Dockerfile and docker-compose instructions
diff --git a/changelog.d/3548.bugfix b/changelog.d/3548.bugfix
deleted file mode 100644
index 38dc3b1232..0000000000
--- a/changelog.d/3548.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis.
diff --git a/changelog.d/3552.misc b/changelog.d/3552.misc
deleted file mode 100644
index 709c3282b4..0000000000
--- a/changelog.d/3552.misc
+++ /dev/null
@@ -1 +0,0 @@
-Release notes are now in the Markdown format.
diff --git a/changelog.d/3553.feature b/changelog.d/3553.feature
deleted file mode 100644
index 77a294cb9f..0000000000
--- a/changelog.d/3553.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add metrics to track resource usage by background processes
diff --git a/changelog.d/3554.feature b/changelog.d/3554.feature
deleted file mode 100644
index b00397872c..0000000000
--- a/changelog.d/3554.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric
diff --git a/changelog.d/3555.feature b/changelog.d/3555.feature
deleted file mode 100644
index ea4a85e0ae..0000000000
--- a/changelog.d/3555.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add support for client_reader to handle more APIs
diff --git a/changelog.d/3556.feature b/changelog.d/3556.feature
deleted file mode 100644
index 77a294cb9f..0000000000
--- a/changelog.d/3556.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add metrics to track resource usage by background processes
diff --git a/changelog.d/3559.misc b/changelog.d/3559.misc
deleted file mode 100644
index 26df859e45..0000000000
--- a/changelog.d/3559.misc
+++ /dev/null
@@ -1 +0,0 @@
-add config for pep8
diff --git a/changelog.d/3562.misc b/changelog.d/3562.misc
deleted file mode 100644
index e69de29bb2..0000000000
--- a/changelog.d/3562.misc
+++ /dev/null
diff --git a/changelog.d/3567.feature b/changelog.d/3567.feature
deleted file mode 100644
index c74c1f57a9..0000000000
--- a/changelog.d/3567.feature
+++ /dev/null
@@ -1 +0,0 @@
-make the /context API filter & lazy-load aware as per MSC1227
diff --git a/changelog.d/3569.bugfix b/changelog.d/3569.bugfix
deleted file mode 100644
index d77f035ee0..0000000000
--- a/changelog.d/3569.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password.
diff --git a/changelog.d/3570.bugfix b/changelog.d/3570.bugfix
deleted file mode 100644
index cec5158a99..0000000000
--- a/changelog.d/3570.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix potential stack overflow and deadlock under heavy load
\ No newline at end of file
diff --git a/changelog.d/3571.misc b/changelog.d/3571.misc
deleted file mode 100644
index 8908324e68..0000000000
--- a/changelog.d/3571.misc
+++ /dev/null
@@ -1 +0,0 @@
-Merge Linearizer and Limiter
diff --git a/changelog.d/3572.misc b/changelog.d/3572.misc
deleted file mode 100644
index 8908324e68..0000000000
--- a/changelog.d/3572.misc
+++ /dev/null
@@ -1 +0,0 @@
-Merge Linearizer and Limiter
diff --git a/changelog.d/3577.misc b/changelog.d/3577.misc
deleted file mode 100644
index e69de29bb2..0000000000
--- a/changelog.d/3577.misc
+++ /dev/null
diff --git a/changelog.d/3579.misc b/changelog.d/3579.misc
deleted file mode 100644
index 2374dc0c44..0000000000
--- a/changelog.d/3579.misc
+++ /dev/null
@@ -1 +0,0 @@
-Lazily load state on master process when using workers to reduce DB consumption
diff --git a/changelog.d/3581.misc b/changelog.d/3581.misc
deleted file mode 100644
index 2374dc0c44..0000000000
--- a/changelog.d/3581.misc
+++ /dev/null
@@ -1 +0,0 @@
-Lazily load state on master process when using workers to reduce DB consumption
diff --git a/changelog.d/3582.misc b/changelog.d/3582.misc
deleted file mode 100644
index 2374dc0c44..0000000000
--- a/changelog.d/3582.misc
+++ /dev/null
@@ -1 +0,0 @@
-Lazily load state on master process when using workers to reduce DB consumption
diff --git a/changelog.d/3584.misc b/changelog.d/3584.misc
deleted file mode 100644
index 2374dc0c44..0000000000
--- a/changelog.d/3584.misc
+++ /dev/null
@@ -1 +0,0 @@
-Lazily load state on master process when using workers to reduce DB consumption
diff --git a/changelog.d/3585.bugfix b/changelog.d/3585.bugfix
deleted file mode 100644
index e8ae1d8cb4..0000000000
--- a/changelog.d/3585.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585
diff --git a/changelog.d/3586.misc b/changelog.d/3586.misc
deleted file mode 100644
index e853e2481b..0000000000
--- a/changelog.d/3586.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fixes and optimisations for resolve_state_groups
diff --git a/changelog.d/3587.misc b/changelog.d/3587.misc
deleted file mode 100644
index 75a3479910..0000000000
--- a/changelog.d/3587.misc
+++ /dev/null
@@ -1 +0,0 @@
-Improve logging for exceptions when handling PDUs
\ No newline at end of file
diff --git a/changelog.d/3590.misc b/changelog.d/3590.misc
deleted file mode 100644
index 0f1688fd0f..0000000000
--- a/changelog.d/3590.misc
+++ /dev/null
@@ -1 +0,0 @@
-Add some measure blocks to persist_events
diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc
deleted file mode 100644
index f0137766a0..0000000000
--- a/changelog.d/3591.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix some random logcontext leaks.
\ No newline at end of file
diff --git a/changelog.d/3592.misc b/changelog.d/3592.misc
deleted file mode 100644
index 60129569c2..0000000000
--- a/changelog.d/3592.misc
+++ /dev/null
@@ -1 +0,0 @@
-Speed up calculating state deltas in persist_event loop
diff --git a/changelog.d/3595.misc b/changelog.d/3595.misc
deleted file mode 100644
index 85903504cc..0000000000
--- a/changelog.d/3595.misc
+++ /dev/null
@@ -1 +0,0 @@
-Attempt to reduce amount of state pulled out of DB during persist_events
diff --git a/changelog.d/3597.feature b/changelog.d/3597.feature
deleted file mode 100644
index ea4a85e0ae..0000000000
--- a/changelog.d/3597.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add support for client_reader to handle more APIs
diff --git a/changelog.d/3601.bugfix b/changelog.d/3601.bugfix
deleted file mode 100644
index 1678b261d0..0000000000
--- a/changelog.d/3601.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix failure to persist events over federation under load
diff --git a/changelog.d/3604.feature b/changelog.d/3604.feature
deleted file mode 100644
index 77a294cb9f..0000000000
--- a/changelog.d/3604.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add metrics to track resource usage by background processes
diff --git a/changelog.d/3605.bugfix b/changelog.d/3605.bugfix
deleted file mode 100644
index 786da546eb..0000000000
--- a/changelog.d/3605.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix updating of cached remote profiles
diff --git a/changelog.d/3606.misc b/changelog.d/3606.misc
deleted file mode 100644
index f0137766a0..0000000000
--- a/changelog.d/3606.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix some random logcontext leaks.
\ No newline at end of file
diff --git a/changelog.d/3607.bugfix b/changelog.d/3607.bugfix
deleted file mode 100644
index 7ad64593b8..0000000000
--- a/changelog.d/3607.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Fix 'tuple index out of range' error
\ No newline at end of file
diff --git a/changelog.d/3609.misc b/changelog.d/3609.misc
deleted file mode 100644
index 5b9566d076..0000000000
--- a/changelog.d/3609.misc
+++ /dev/null
@@ -1 +0,0 @@
-Fix a documentation typo in on_make_leave_request
diff --git a/changelog.d/3610.feature b/changelog.d/3610.feature
deleted file mode 100644
index 77a294cb9f..0000000000
--- a/changelog.d/3610.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add metrics to track resource usage by background processes
diff --git a/changelog.d/3612.misc b/changelog.d/3612.misc
deleted file mode 100644
index f90d2f2ff5..0000000000
--- a/changelog.d/3612.misc
+++ /dev/null
@@ -1 +0,0 @@
-Make EventStore inherit from EventFederationStore
diff --git a/changelog.d/3613.misc b/changelog.d/3613.misc
deleted file mode 100644
index d9378f6b49..0000000000
--- a/changelog.d/3613.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove some redundant joins on event_edges.room_id
diff --git a/changelog.d/3614.misc b/changelog.d/3614.misc
deleted file mode 100644
index 356f28471e..0000000000
--- a/changelog.d/3614.misc
+++ /dev/null
@@ -1 +0,0 @@
-Stop populating events.content
diff --git a/changelog.d/3616.misc b/changelog.d/3616.misc
deleted file mode 100644
index 04629faa36..0000000000
--- a/changelog.d/3616.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update the /send_leave path registration to use event_id rather than a transaction ID.
diff --git a/changelog.d/3621.misc b/changelog.d/3621.misc
deleted file mode 100644
index 83e5fc8aa1..0000000000
--- a/changelog.d/3621.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor FederationHandler to move DB writes into separate functions
diff --git a/changelog.d/3626.bugfix b/changelog.d/3626.bugfix
deleted file mode 100644
index 9a4b878986..0000000000
--- a/changelog.d/3626.bugfix
+++ /dev/null
@@ -1 +0,0 @@
-Only import secrets when available (fix for py < 3.6)
diff --git a/changelog.d/3628.misc b/changelog.d/3628.misc
deleted file mode 100644
index 1aebefbe18..0000000000
--- a/changelog.d/3628.misc
+++ /dev/null
@@ -1 +0,0 @@
-Remove unused field "pdu_failures" from transactions.
diff --git a/changelog.d/3630.feature b/changelog.d/3630.feature
deleted file mode 100644
index 8007a04840..0000000000
--- a/changelog.d/3630.feature
+++ /dev/null
@@ -1 +0,0 @@
-Add ability to limit number of monthly active users on the server
diff --git a/changelog.d/3632.misc b/changelog.d/3632.misc
new file mode 100644
index 0000000000..9d64bbe83b
--- /dev/null
+++ b/changelog.d/3632.misc
@@ -0,0 +1 @@
+Refactor HTTP replication endpoints to reduce code duplication
diff --git a/changelog.d/3634.misc b/changelog.d/3634.misc
deleted file mode 100644
index 2cd6af91ff..0000000000
--- a/changelog.d/3634.misc
+++ /dev/null
@@ -1 +0,0 @@
-rename replication_layer to federation_client
diff --git a/changelog.d/3638.misc b/changelog.d/3638.misc
deleted file mode 100644
index 9faab15cf2..0000000000
--- a/changelog.d/3638.misc
+++ /dev/null
@@ -1 +0,0 @@
-Factor out exception handling in federation_client
diff --git a/changelog.d/3639.feature b/changelog.d/3639.feature
deleted file mode 100644
index c8c387e219..0000000000
--- a/changelog.d/3639.feature
+++ /dev/null
@@ -1 +0,0 @@
-When we fail to join a room over federation, pass the error code back to the client.
\ No newline at end of file
diff --git a/changelog.d/3644.misc b/changelog.d/3644.misc
deleted file mode 100644
index 2347fc8500..0000000000
--- a/changelog.d/3644.misc
+++ /dev/null
@@ -1 +0,0 @@
-Refactor location of docker build script.
diff --git a/changelog.d/3645.misc b/changelog.d/3645.misc
deleted file mode 100644
index 0fe6b28da1..0000000000
--- a/changelog.d/3645.misc
+++ /dev/null
@@ -1 +0,0 @@
-Update CONTRIBUTING to mention newsfragments.
diff --git a/changelog.d/3647.misc b/changelog.d/3647.misc
new file mode 100644
index 0000000000..dbc66dae60
--- /dev/null
+++ b/changelog.d/3647.misc
@@ -0,0 +1 @@
+Tests now correctly execute on Python 3.
diff --git a/changelog.d/3654.feature b/changelog.d/3654.feature
new file mode 100644
index 0000000000..35c95580bc
--- /dev/null
+++ b/changelog.d/3654.feature
@@ -0,0 +1 @@
+Basic support for room versioning
diff --git a/changelog.d/3664.feature b/changelog.d/3664.feature
new file mode 100644
index 0000000000..184dde9939
--- /dev/null
+++ b/changelog.d/3664.feature
@@ -0,0 +1 @@
+Add some metrics for the appservice and federation event sending loops
diff --git a/pyproject.toml b/pyproject.toml
index f4d6f0c6bb..dd099dc9c8 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -2,7 +2,7 @@
package = "synapse"
filename = "CHANGES.md"
directory = "changelog.d"
- issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue}>)"
+ issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue})"
[[tool.towncrier.type]]
directory = "feature"
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 1810cb6fcd..a14d578e36 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -17,4 +17,4 @@
""" This is a reference implementation of a Matrix home server.
"""
-__version__ = "0.33.1"
+__version__ = "0.33.2"
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 4df930c8d1..b0da506f6d 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
# Copyright 2017 Vector Creations Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -94,3 +95,11 @@ class RoomCreationPreset(object):
class ThirdPartyEntityKind(object):
USER = "user"
LOCATION = "location"
+
+
+# the version we will give rooms which are created on this server
+DEFAULT_ROOM_VERSION = "1"
+
+# vdh-test-version is a placeholder to get room versioning support working and tested
+# until we have a working v2.
+KNOWN_ROOM_VERSIONS = {"1", "vdh-test-version"}
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index b41d595059..70400347bc 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -56,6 +57,8 @@ class Codes(object):
CONSENT_NOT_GIVEN = "M_CONSENT_NOT_GIVEN"
CANNOT_LEAVE_SERVER_NOTICE_ROOM = "M_CANNOT_LEAVE_SERVER_NOTICE_ROOM"
MAU_LIMIT_EXCEEDED = "M_MAU_LIMIT_EXCEEDED"
+ UNSUPPORTED_ROOM_VERSION = "M_UNSUPPORTED_ROOM_VERSION"
+ INCOMPATIBLE_ROOM_VERSION = "M_INCOMPATIBLE_ROOM_VERSION"
class CodeMessageException(RuntimeError):
@@ -285,6 +288,27 @@ class LimitExceededError(SynapseError):
)
+class IncompatibleRoomVersionError(SynapseError):
+ """A server is trying to join a room whose version it does not support."""
+
+ def __init__(self, room_version):
+ super(IncompatibleRoomVersionError, self).__init__(
+ code=400,
+ msg="Your homeserver does not support the features required to "
+ "join this room",
+ errcode=Codes.INCOMPATIBLE_ROOM_VERSION,
+ )
+
+ self._room_version = room_version
+
+ def error_dict(self):
+ return cs_error(
+ self.msg,
+ self.errcode,
+ room_version=self._room_version,
+ )
+
+
def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
""" Utility method for constructing an error response for client-server
interactions.
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index b32f64e729..6baeccca38 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -20,7 +20,7 @@ from signedjson.key import decode_verify_key_bytes
from signedjson.sign import SignatureVerifyException, verify_signed_json
from unpaddedbase64 import decode_base64
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, JoinRules, Membership
from synapse.api.errors import AuthError, EventSizeError, SynapseError
from synapse.types import UserID, get_domain_from_id
@@ -83,6 +83,14 @@ def check(event, auth_events, do_sig_check=True, do_size_check=True):
403,
"Creation event's room_id domain does not match sender's"
)
+
+ room_version = event.content.get("room_version", "1")
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise AuthError(
+ 403,
+ "room appears to have unsupported version %s" % (
+ room_version,
+ ))
# FIXME
logger.debug("Allowing! %s", event)
return
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7550e11b6e..c9f3c2d352 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,7 +25,7 @@ from prometheus_client import Counter
from twisted.internet import defer
-from synapse.api.constants import Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
from synapse.api.errors import (
CodeMessageException,
FederationDeniedError,
@@ -518,10 +518,10 @@ class FederationClient(FederationBase):
description, destination, exc_info=1,
)
- raise RuntimeError("Failed to %s via any server", description)
+ raise RuntimeError("Failed to %s via any server" % (description, ))
def make_membership_event(self, destinations, room_id, user_id, membership,
- content={},):
+ content, params):
"""
Creates an m.room.member event, with context, without participating in the room.
@@ -537,8 +537,10 @@ class FederationClient(FederationBase):
user_id (str): The user whose membership is being evented.
membership (str): The "membership" property of the event. Must be
one of "join" or "leave".
- content (object): Any additional data to put into the content field
+ content (dict): Any additional data to put into the content field
of the event.
+ params (dict[str, str|Iterable[str]]): Query parameters to include in the
+ request.
Return:
Deferred: resolves to a tuple of (origin (str), event (object))
where origin is the remote homeserver which generated the event.
@@ -558,10 +560,12 @@ class FederationClient(FederationBase):
@defer.inlineCallbacks
def send_request(destination):
ret = yield self.transport_layer.make_membership_event(
- destination, room_id, user_id, membership
+ destination, room_id, user_id, membership, params,
)
- pdu_dict = ret["event"]
+ pdu_dict = ret.get("event", None)
+ if not isinstance(pdu_dict, dict):
+ raise InvalidResponseError("Bad 'event' field in response")
logger.debug("Got response to make_%s: %s", membership, pdu_dict)
@@ -605,6 +609,26 @@ class FederationClient(FederationBase):
Fails with a ``RuntimeError`` if no servers were reachable.
"""
+ def check_authchain_validity(signed_auth_chain):
+ for e in signed_auth_chain:
+ if e.type == EventTypes.Create:
+ create_event = e
+ break
+ else:
+ raise InvalidResponseError(
+ "no %s in auth chain" % (EventTypes.Create,),
+ )
+
+ # the room version should be sane.
+ room_version = create_event.content.get("room_version", "1")
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ # This shouldn't be possible, because the remote server should have
+ # rejected the join attempt during make_join.
+ raise InvalidResponseError(
+ "room appears to have unsupported version %s" % (
+ room_version,
+ ))
+
@defer.inlineCallbacks
def send_request(destination):
time_now = self._clock.time_msec()
@@ -661,7 +685,7 @@ class FederationClient(FederationBase):
for s in signed_state:
s.internal_metadata = copy.deepcopy(s.internal_metadata)
- auth_chain.sort(key=lambda e: e.depth)
+ check_authchain_validity(signed_auth)
defer.returnValue({
"state": signed_state,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index bf89d568af..2b62f687b6 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -27,7 +27,13 @@ from twisted.internet.abstract import isIPAddress
from twisted.python import failure
from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
+from synapse.api.errors import (
+ AuthError,
+ FederationError,
+ IncompatibleRoomVersionError,
+ NotFoundError,
+ SynapseError,
+)
from synapse.crypto.event_signing import compute_event_signature
from synapse.federation.federation_base import FederationBase, event_from_pdu_json
from synapse.federation.persistence import TransactionActions
@@ -323,12 +329,21 @@ class FederationServer(FederationBase):
defer.returnValue((200, resp))
@defer.inlineCallbacks
- def on_make_join_request(self, origin, room_id, user_id):
+ def on_make_join_request(self, origin, room_id, user_id, supported_versions):
origin_host, _ = parse_server_name(origin)
yield self.check_server_matches_acl(origin_host, room_id)
+
+ room_version = yield self.store.get_room_version(room_id)
+ if room_version not in supported_versions:
+ logger.warn("Room version %s not in %s", room_version, supported_versions)
+ raise IncompatibleRoomVersionError(room_version=room_version)
+
pdu = yield self.handler.on_make_join_request(room_id, user_id)
time_now = self._clock.time_msec()
- defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+ defer.returnValue({
+ "event": pdu.get_pdu_json(time_now),
+ "room_version": room_version,
+ })
@defer.inlineCallbacks
def on_invite_request(self, origin, content):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 78f9d40a3a..f603c8a368 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
from synapse.metrics import (
LaterGauge,
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
events_processed_counter,
sent_edus_counter,
sent_transactions_counter,
@@ -253,7 +255,13 @@ class TransactionQueue(object):
synapse.metrics.event_processing_last_ts.labels(
"federation_sender").set(ts)
- events_processed_counter.inc(len(events))
+ events_processed_counter.inc(len(events))
+
+ event_processing_loop_room_count.labels(
+ "federation_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("federation_sender").inc()
synapse.metrics.event_processing_positions.labels(
"federation_sender").set(next_token)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4529d454af..b4fbe2c9d5 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -195,7 +195,7 @@ class TransportLayerClient(object):
@defer.inlineCallbacks
@log_function
- def make_membership_event(self, destination, room_id, user_id, membership):
+ def make_membership_event(self, destination, room_id, user_id, membership, params):
"""Asks a remote server to build and sign us a membership event
Note that this does not append any events to any graphs.
@@ -205,6 +205,8 @@ class TransportLayerClient(object):
room_id (str): room to join/leave
user_id (str): user to be joined/left
membership (str): one of join/leave
+ params (dict[str, str|Iterable[str]]): Query parameters to include in the
+ request.
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
@@ -241,6 +243,7 @@ class TransportLayerClient(object):
content = yield self.client.get_json(
destination=destination,
path=path,
+ args=params,
retry_on_dns_fail=retry_on_dns_fail,
timeout=20000,
ignore_backoff=ignore_backoff,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index eae5f2b427..77969a4f38 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
class BaseFederationServlet(object):
+ """Abstract base class for federation servlet classes.
+
+ The servlet object should have a PATH attribute which takes the form of a regexp to
+ match against the request path (excluding the /federation/v1 prefix).
+
+ The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
+ the appropriate HTTP method. These methods have the signature:
+
+ on_<METHOD>(self, origin, content, query, **kwargs)
+
+ With arguments:
+
+ origin (unicode|None): The authenticated server_name of the calling server,
+ unless REQUIRE_AUTH is set to False and authentication failed.
+
+ content (unicode|None): decoded json body of the request. None if the
+ request was a GET.
+
+ query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
+ (ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
+ yet.
+
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: either (response code, response object) to
+ return a JSON response, or None if the request has already been handled.
+
+ Raises:
+ SynapseError: to return an error code
+
+ Exception: other exceptions will be caught, logged, and a 500 will be
+ returned.
+ """
REQUIRE_AUTH = True
def __init__(self, handler, authenticator, ratelimiter, server_name):
@@ -204,6 +239,18 @@ class BaseFederationServlet(object):
@defer.inlineCallbacks
@functools.wraps(func)
def new_func(request, *args, **kwargs):
+ """ A callback which can be passed to HttpServer.RegisterPaths
+
+ Args:
+ request (twisted.web.http.Request):
+ *args: unused?
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: (response code, response object) as returned
+ by the callback method. None if the request has already been handled.
+ """
content = None
if request.method in ["PUT", "POST"]:
# TODO: Handle other method types? other content types?
@@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
@defer.inlineCallbacks
- def on_GET(self, origin, content, query, context, user_id):
+ def on_GET(self, origin, _content, query, context, user_id):
+ """
+ Args:
+ origin (unicode): The authenticated server_name of the calling server
+
+ _content (None): (GETs don't have bodies)
+
+ query (dict[bytes, list[bytes]]): Query params from the request.
+
+ **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+ components as specified in the path match regexp.
+
+ Returns:
+ Deferred[(int, object)|None]: either (response code, response object) to
+ return a JSON response, or None if the request has already been handled.
+ """
+ versions = query.get(b'ver')
+ if versions is not None:
+ supported_versions = [v.decode("utf-8") for v in versions]
+ else:
+ supported_versions = ["1"]
+
content = yield self.handler.on_make_join_request(
origin, context, user_id,
+ supported_versions=supported_versions,
)
defer.returnValue((200, content))
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ee41aed69e..f0f89af7dc 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -23,6 +23,10 @@ from twisted.internet import defer
import synapse
from synapse.api.constants import EventTypes
+from synapse.metrics import (
+ event_processing_loop_counter,
+ event_processing_loop_room_count,
+)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.logcontext import make_deferred_yieldable, run_in_background
from synapse.util.metrics import Measure
@@ -136,6 +140,12 @@ class ApplicationServicesHandler(object):
events_processed_counter.inc(len(events))
+ event_processing_loop_room_count.labels(
+ "appservice_sender"
+ ).inc(len(events_by_room))
+
+ event_processing_loop_counter.labels("appservice_sender").inc()
+
synapse.metrics.event_processing_lag.labels(
"appservice_sender").set(now - ts)
synapse.metrics.event_processing_last_ts.labels(
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 533b82c783..0dffd44e22 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -30,7 +30,12 @@ from unpaddedbase64 import decode_base64
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ Membership,
+ RejectedReason,
+)
from synapse.api.errors import (
AuthError,
CodeMessageException,
@@ -922,6 +927,9 @@ class FederationHandler(BaseHandler):
joinee,
"join",
content,
+ params={
+ "ver": KNOWN_ROOM_VERSIONS,
+ },
)
# This shouldn't happen, because the RoomMemberHandler has a
@@ -1187,13 +1195,14 @@ class FederationHandler(BaseHandler):
@defer.inlineCallbacks
def _make_and_verify_event(self, target_hosts, room_id, user_id, membership,
- content={},):
+ content={}, params=None):
origin, pdu = yield self.federation_client.make_membership_event(
target_hosts,
room_id,
user_id,
membership,
content,
+ params=params,
)
logger.debug("Got response to make_%s: %s", membership, pdu)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 39d7724778..bcb093ba3e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder
from synapse.crypto.event_signing import add_hashes_and_signatures
from synapse.events.utils import serialize_event
from synapse.events.validator import EventValidator
-from synapse.replication.http.send_event import send_event_to_master
+from synapse.replication.http.send_event import ReplicationSendEventRestServlet
from synapse.types import RoomAlias, UserID
from synapse.util.async import Linearizer
from synapse.util.frozenutils import frozendict_json_encoder
@@ -171,7 +171,7 @@ class EventCreationHandler(object):
self.notifier = hs.get_notifier()
self.config = hs.config
- self.http_client = hs.get_simple_http_client()
+ self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs)
# This is only used to get at ratelimit function, and maybe_kick_guest_users
self.base_handler = BaseHandler(hs)
@@ -559,12 +559,9 @@ class EventCreationHandler(object):
try:
# If we're a worker we need to hit out to the master.
if self.config.worker_app:
- yield send_event_to_master(
- clock=self.hs.get_clock(),
+ yield self.send_event_to_master(
+ event_id=event.event_id,
store=self.store,
- client=self.http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
requester=requester,
event=event,
context=context,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b7804d9b2..6a17c42238 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -21,9 +21,17 @@ import math
import string
from collections import OrderedDict
+from six import string_types
+
from twisted.internet import defer
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+ DEFAULT_ROOM_VERSION,
+ KNOWN_ROOM_VERSIONS,
+ EventTypes,
+ JoinRules,
+ RoomCreationPreset,
+)
from synapse.api.errors import AuthError, Codes, StoreError, SynapseError
from synapse.types import RoomAlias, RoomID, RoomStreamToken, StreamToken, UserID
from synapse.util import stringutils
@@ -99,6 +107,21 @@ class RoomCreationHandler(BaseHandler):
if ratelimit:
yield self.ratelimit(requester)
+ room_version = config.get("room_version", DEFAULT_ROOM_VERSION)
+ if not isinstance(room_version, string_types):
+ raise SynapseError(
+ 400,
+ "room_version must be a string",
+ Codes.BAD_JSON,
+ )
+
+ if room_version not in KNOWN_ROOM_VERSIONS:
+ raise SynapseError(
+ 400,
+ "Your homeserver does not support this room version",
+ Codes.UNSUPPORTED_ROOM_VERSION,
+ )
+
if "room_alias_name" in config:
for wchar in string.whitespace:
if wchar in config["room_alias_name"]:
@@ -184,6 +207,9 @@ class RoomCreationHandler(BaseHandler):
creation_content = config.get("creation_content", {})
+ # override any attempt to set room versions via the creation_content
+ creation_content["room_version"] = room_version
+
room_member_handler = self.hs.get_room_member_handler()
yield self._send_events_for_new_room(
diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py
index 22d8b4b0d3..acc6eb8099 100644
--- a/synapse/handlers/room_member_worker.py
+++ b/synapse/handlers/room_member_worker.py
@@ -20,16 +20,24 @@ from twisted.internet import defer
from synapse.api.errors import SynapseError
from synapse.handlers.room_member import RoomMemberHandler
from synapse.replication.http.membership import (
- get_or_register_3pid_guest,
- notify_user_membership_change,
- remote_join,
- remote_reject_invite,
+ ReplicationRegister3PIDGuestRestServlet as Repl3PID,
+ ReplicationRemoteJoinRestServlet as ReplRemoteJoin,
+ ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite,
+ ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft,
)
logger = logging.getLogger(__name__)
class RoomMemberWorkerHandler(RoomMemberHandler):
+ def __init__(self, hs):
+ super(RoomMemberWorkerHandler, self).__init__(hs)
+
+ self._get_register_3pid_client = Repl3PID.make_client(hs)
+ self._remote_join_client = ReplRemoteJoin.make_client(hs)
+ self._remote_reject_client = ReplRejectInvite.make_client(hs)
+ self._notify_change_client = ReplJoinedLeft.make_client(hs)
+
@defer.inlineCallbacks
def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
"""Implements RoomMemberHandler._remote_join
@@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
if len(remote_room_hosts) == 0:
raise SynapseError(404, "No known servers")
- ret = yield remote_join(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ ret = yield self._remote_join_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
"""Implements RoomMemberHandler._remote_reject_invite
"""
- return remote_reject_invite(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._remote_reject_client(
requester=requester,
remote_room_hosts=remote_room_hosts,
room_id=room_id,
@@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_joined_room(self, target, room_id):
"""Implements RoomMemberHandler._user_joined_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="joined",
@@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def _user_left_room(self, target, room_id):
"""Implements RoomMemberHandler._user_left_room
"""
- return notify_user_membership_change(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._notify_change_client(
user_id=target.to_string(),
room_id=room_id,
change="left",
@@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler):
def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id):
"""Implements RoomMemberHandler.get_or_register_3pid_guest
"""
- return get_or_register_3pid_guest(
- self.simple_http_client,
- host=self.config.worker_replication_host,
- port=self.config.worker_replication_http_port,
+ return self._get_register_3pid_client(
requester=requester,
medium=medium,
address=address,
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index bf1aa29502..b3f5415aa6 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -439,7 +439,7 @@ class MatrixFederationHttpClient(object):
defer.returnValue(json.loads(body))
@defer.inlineCallbacks
- def get_json(self, destination, path, args={}, retry_on_dns_fail=True,
+ def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
timeout=None, ignore_backoff=False):
""" GETs some json from the given host homeserver and path
@@ -447,7 +447,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
- args (dict): A dictionary used to create query strings, defaults to
+ args (dict|None): A dictionary used to create query strings, defaults to
None.
timeout (int): How long to try (in ms) the destination for before
giving up. None indicates no timeout and that the request will
@@ -702,6 +702,9 @@ def check_content_type_is_json(headers):
def encode_query_args(args):
+ if args is None:
+ return b""
+
encoded_args = {}
for k, vs in args.items():
if isinstance(vs, string_types):
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a9158fc066..550f8443f7 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
+event_processing_loop_counter = Counter(
+ "synapse_event_processing_loop_count",
+ "Event processing loop iterations",
+ ["name"],
+)
+
+event_processing_loop_room_count = Counter(
+ "synapse_event_processing_loop_room_count",
+ "Rooms seen per event processing loop iteration",
+ ["name"],
+)
+
+
# Used to track where various components have processed in the event stream,
# e.g. federation sending, appservice sending, etc.
event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
new file mode 100644
index 0000000000..5e5376cf58
--- /dev/null
+++ b/synapse/replication/http/_base.py
@@ -0,0 +1,215 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import abc
+import logging
+import re
+
+from six.moves import urllib
+
+from twisted.internet import defer
+
+from synapse.api.errors import CodeMessageException, HttpResponseException
+from synapse.util.caches.response_cache import ResponseCache
+from synapse.util.stringutils import random_string
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationEndpoint(object):
+ """Helper base class for defining new replication HTTP endpoints.
+
+ This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..`
+ (with an `/:txn_id` prefix for cached requests.), where NAME is a name,
+ PATH_ARGS are a tuple of parameters to be encoded in the URL.
+
+ For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`,
+ with `CACHE` set to true then this generates an endpoint:
+
+ /_synapse/replication/send_event/:event_id/:txn_id
+
+ For POST/PUT requests the payload is serialized to json and sent as the
+ body, while for GET requests the payload is added as query parameters. See
+ `_serialize_payload` for details.
+
+ Incoming requests are handled by overriding `_handle_request`. Servers
+ must call `register` to register the path with the HTTP server.
+
+ Requests can be sent by calling the client returned by `make_client`.
+
+ Attributes:
+ NAME (str): A name for the endpoint, added to the path as well as used
+ in logging and metrics.
+ PATH_ARGS (tuple[str]): A list of parameters to be added to the path.
+ Adding parameters to the path (rather than payload) can make it
+ easier to follow along in the log files.
+ METHOD (str): The method of the HTTP request, defaults to POST. Can be
+ one of POST, PUT or GET. If GET then the payload is sent as query
+ parameters rather than a JSON body.
+ CACHE (bool): Whether server should cache the result of the request/
+ If true then transparently adds a txn_id to all requests, and
+ `_handle_request` must return a Deferred.
+ RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
+ is received.
+ """
+
+ __metaclass__ = abc.ABCMeta
+
+ NAME = abc.abstractproperty()
+ PATH_ARGS = abc.abstractproperty()
+
+ METHOD = "POST"
+ CACHE = True
+ RETRY_ON_TIMEOUT = True
+
+ def __init__(self, hs):
+ if self.CACHE:
+ self.response_cache = ResponseCache(
+ hs, "repl." + self.NAME,
+ timeout_ms=30 * 60 * 1000,
+ )
+
+ assert self.METHOD in ("PUT", "POST", "GET")
+
+ @abc.abstractmethod
+ def _serialize_payload(**kwargs):
+ """Static method that is called when creating a request.
+
+ Concrete implementations should have explicit parameters (rather than
+ kwargs) so that an appropriate exception is raised if the client is
+ called with unexpected parameters. All PATH_ARGS must appear in
+ argument list.
+
+ Returns:
+ Deferred[dict]|dict: If POST/PUT request then dictionary must be
+ JSON serialisable, otherwise must be appropriate for adding as
+ query args.
+ """
+ return {}
+
+ @abc.abstractmethod
+ def _handle_request(self, request, **kwargs):
+ """Handle incoming request.
+
+ This is called with the request object and PATH_ARGS.
+
+ Returns:
+ Deferred[dict]: A JSON serialisable dict to be used as response
+ body of request.
+ """
+ pass
+
+ @classmethod
+ def make_client(cls, hs):
+ """Create a client that makes requests.
+
+ Returns a callable that accepts the same parameters as `_serialize_payload`.
+ """
+ clock = hs.get_clock()
+ host = hs.config.worker_replication_host
+ port = hs.config.worker_replication_http_port
+
+ client = hs.get_simple_http_client()
+
+ @defer.inlineCallbacks
+ def send_request(**kwargs):
+ data = yield cls._serialize_payload(**kwargs)
+
+ url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS]
+
+ if cls.CACHE:
+ txn_id = random_string(10)
+ url_args.append(txn_id)
+
+ if cls.METHOD == "POST":
+ request_func = client.post_json_get_json
+ elif cls.METHOD == "PUT":
+ request_func = client.put_json
+ elif cls.METHOD == "GET":
+ request_func = client.get_json
+ else:
+ # We have already asserted in the constructor that a
+ # compatible was picked, but lets be paranoid.
+ raise Exception(
+ "Unknown METHOD on %s replication endpoint" % (cls.NAME,)
+ )
+
+ uri = "http://%s:%s/_synapse/replication/%s/%s" % (
+ host, port, cls.NAME, "/".join(url_args)
+ )
+
+ try:
+ # We keep retrying the same request for timeouts. This is so that we
+ # have a good idea that the request has either succeeded or failed on
+ # the master, and so whether we should clean up or not.
+ while True:
+ try:
+ result = yield request_func(uri, data)
+ break
+ except CodeMessageException as e:
+ if e.code != 504 or not cls.RETRY_ON_TIMEOUT:
+ raise
+
+ logger.warn("%s request timed out", cls.NAME)
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ yield clock.sleep(1)
+ except HttpResponseException as e:
+ # We convert to SynapseError as we know that it was a SynapseError
+ # on the master process that we should send to the client. (And
+ # importantly, not stack traces everywhere)
+ raise e.to_synapse_error()
+
+ defer.returnValue(result)
+
+ return send_request
+
+ def register(self, http_server):
+ """Called by the server to register this as a handler to the
+ appropriate path.
+ """
+
+ url_args = list(self.PATH_ARGS)
+ handler = self._handle_request
+ method = self.METHOD
+
+ if self.CACHE:
+ handler = self._cached_handler
+ url_args.append("txn_id")
+
+ args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args)
+ pattern = re.compile("^/_synapse/replication/%s/%s$" % (
+ self.NAME,
+ args
+ ))
+
+ http_server.register_paths(method, [pattern], handler)
+
+ def _cached_handler(self, request, txn_id, **kwargs):
+ """Called on new incoming requests when caching is enabled. Checks
+ if there is a cached response for the request and returns that,
+ otherwise calls `_handle_request` and caches its response.
+ """
+ # We just use the txn_id here, but we probably also want to use the
+ # other PATH_ARGS as well.
+
+ assert self.CACHE
+
+ return self.response_cache.wrap(
+ txn_id,
+ self._handle_request,
+ request, **kwargs
+ )
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index 7a3cfb159c..e58bebf12a 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -14,182 +14,63 @@
# limitations under the License.
import logging
-import re
from twisted.internet import defer
-from synapse.api.errors import HttpResponseException
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
from synapse.util.distributor import user_joined_room, user_left_room
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def remote_join(client, host, port, requester, remote_room_hosts,
- room_id, user_id, content):
- """Ask the master to do a remote join for the given user to the given room
+class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
+ """Does a remote join for the given user to the given room
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- remote_room_hosts (list[str]): Servers to try and join via
- room_id (str)
- user_id (str)
- content (dict): The event content to use for the join event
+ Request format:
- Returns:
- Deferred
- """
- uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "remote_room_hosts": remote_room_hosts,
- "room_id": room_id,
- "user_id": user_id,
- "content": content,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def remote_reject_invite(client, host, port, requester, remote_room_hosts,
- room_id, user_id):
- """Ask master to reject the invite for the user and room.
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- remote_room_hosts (list[str]): Servers to try and reject via
- room_id (str)
- user_id (str)
-
- Returns:
- Deferred
- """
- uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "remote_room_hosts": remote_room_hosts,
- "room_id": room_id,
- "user_id": user_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def get_or_register_3pid_guest(client, host, port, requester,
- medium, address, inviter_user_id):
- """Ask the master to get/create a guest account for given 3PID.
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- medium (str)
- address (str)
- inviter_user_id (str): The user ID who is trying to invite the
- 3PID
-
- Returns:
- Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the
- 3PID guest account.
- """
+ POST /_synapse/replication/remote_join/:room_id/:user_id
- uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port)
-
- payload = {
- "requester": requester.serialize(),
- "medium": medium,
- "address": address,
- "inviter_user_id": inviter_user_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-@defer.inlineCallbacks
-def notify_user_membership_change(client, host, port, user_id, room_id, change):
- """Notify master that a user has joined or left the room
-
- Args:
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication.
- user_id (str)
- room_id (str)
- change (str): Either "join" or "left"
-
- Returns:
- Deferred
+ {
+ "requester": ...,
+ "remote_room_hosts": [...],
+ "content": { ... }
+ }
"""
- assert change in ("joined", "left")
-
- uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change)
-
- payload = {
- "user_id": user_id,
- "room_id": room_id,
- }
-
- try:
- result = yield client.post_json_get_json(uri, payload)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-class ReplicationRemoteJoinRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/remote_join$")]
+ NAME = "remote_join"
+ PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
- super(ReplicationRemoteJoinRestServlet, self).__init__()
+ super(ReplicationRemoteJoinRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, room_id, user_id, remote_room_hosts,
+ content):
+ """
+ Args:
+ requester(Requester)
+ room_id (str)
+ user_id (str)
+ remote_room_hosts (list[str]): Servers to try and join via
+ content(dict): The event content to use for the join event
+ """
+ return {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ "content": content,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
- room_id = content["room_id"]
- user_id = content["user_id"]
event_content = content["content"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet):
defer.returnValue((200, {}))
-class ReplicationRemoteRejectInviteRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")]
+class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
+ """Rejects the invite for the user and room.
+
+ Request format:
+
+ POST /_synapse/replication/remote_reject_invite/:room_id/:user_id
+
+ {
+ "requester": ...,
+ "remote_room_hosts": [...],
+ }
+ """
+
+ NAME = "remote_reject_invite"
+ PATH_ARGS = ("room_id", "user_id",)
def __init__(self, hs):
- super(ReplicationRemoteRejectInviteRestServlet, self).__init__()
+ super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs)
self.federation_handler = hs.get_handlers().federation_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, room_id, user_id, remote_room_hosts):
+ """
+ Args:
+ requester(Requester)
+ room_id (str)
+ user_id (str)
+ remote_room_hosts (list[str]): Servers to try and reject via
+ """
+ return {
+ "requester": requester.serialize(),
+ "remote_room_hosts": remote_room_hosts,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request, room_id, user_id):
content = parse_json_object_from_request(request)
remote_room_hosts = content["remote_room_hosts"]
- room_id = content["room_id"]
- user_id = content["user_id"]
requester = Requester.deserialize(self.store, content["requester"])
@@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet):
defer.returnValue((200, ret))
-class ReplicationRegister3PIDGuestRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")]
+class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint):
+ """Gets/creates a guest account for given 3PID.
+
+ Request format:
+
+ POST /_synapse/replication/get_or_register_3pid_guest/
+
+ {
+ "requester": ...,
+ "medium": ...,
+ "address": ...,
+ "inviter_user_id": ...
+ }
+ """
+
+ NAME = "get_or_register_3pid_guest"
+ PATH_ARGS = ()
def __init__(self, hs):
- super(ReplicationRegister3PIDGuestRestServlet, self).__init__()
+ super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
+ @staticmethod
+ def _serialize_payload(requester, medium, address, inviter_user_id):
+ """
+ Args:
+ requester(Requester)
+ medium (str)
+ address (str)
+ inviter_user_id (str): The user ID who is trying to invite the
+ 3PID
+ """
+ return {
+ "requester": requester.serialize(),
+ "medium": medium,
+ "address": address,
+ "inviter_user_id": inviter_user_id,
+ }
+
@defer.inlineCallbacks
- def on_POST(self, request):
+ def _handle_request(self, request):
content = parse_json_object_from_request(request)
medium = content["medium"]
@@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet):
defer.returnValue((200, ret))
-class ReplicationUserJoinedLeftRoomRestServlet(RestServlet):
- PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")]
+class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint):
+ """Notifies that a user has joined or left the room
+
+ Request format:
+
+ POST /_synapse/replication/membership_change/:room_id/:user_id/:change
+
+ {}
+ """
+
+ NAME = "membership_change"
+ PATH_ARGS = ("room_id", "user_id", "change")
+ CACHE = False # No point caching as should return instantly.
def __init__(self, hs):
- super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__()
+ super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs)
self.registeration_handler = hs.get_handlers().registration_handler
self.store = hs.get_datastore()
self.clock = hs.get_clock()
self.distributor = hs.get_distributor()
- def on_POST(self, request, change):
- content = parse_json_object_from_request(request)
+ @staticmethod
+ def _serialize_payload(room_id, user_id, change):
+ """
+ Args:
+ room_id (str)
+ user_id (str)
+ change (str): Either "joined" or "left"
+ """
+ assert change in ("joined", "left",)
- user_id = content["user_id"]
- room_id = content["room_id"]
+ return {}
+ def _handle_request(self, request, room_id, user_id, change):
logger.info("user membership change: %s in %s", user_id, room_id)
user = UserID.from_string(user_id)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index d3509dc288..5b52c91650 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -14,86 +14,26 @@
# limitations under the License.
import logging
-import re
from twisted.internet import defer
-from synapse.api.errors import CodeMessageException, HttpResponseException
from synapse.events import FrozenEvent
from synapse.events.snapshot import EventContext
-from synapse.http.servlet import RestServlet, parse_json_object_from_request
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
from synapse.types import Requester, UserID
-from synapse.util.caches.response_cache import ResponseCache
from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-@defer.inlineCallbacks
-def send_event_to_master(clock, store, client, host, port, requester, event, context,
- ratelimit, extra_users):
- """Send event to be handled on the master
-
- Args:
- clock (synapse.util.Clock)
- store (DataStore)
- client (SimpleHttpClient)
- host (str): host of master
- port (int): port on master listening for HTTP replication
- requester (Requester)
- event (FrozenEvent)
- context (EventContext)
- ratelimit (bool)
- extra_users (list(UserID)): Any extra users to notify about event
- """
- uri = "http://%s:%s/_synapse/replication/send_event/%s" % (
- host, port, event.event_id,
- )
-
- serialized_context = yield context.serialize(event, store)
-
- payload = {
- "event": event.get_pdu_json(),
- "internal_metadata": event.internal_metadata.get_dict(),
- "rejected_reason": event.rejected_reason,
- "context": serialized_context,
- "requester": requester.serialize(),
- "ratelimit": ratelimit,
- "extra_users": [u.to_string() for u in extra_users],
- }
-
- try:
- # We keep retrying the same request for timeouts. This is so that we
- # have a good idea that the request has either succeeded or failed on
- # the master, and so whether we should clean up or not.
- while True:
- try:
- result = yield client.put_json(uri, payload)
- break
- except CodeMessageException as e:
- if e.code != 504:
- raise
-
- logger.warn("send_event request timed out")
-
- # If we timed out we probably don't need to worry about backing
- # off too much, but lets just wait a little anyway.
- yield clock.sleep(1)
- except HttpResponseException as e:
- # We convert to SynapseError as we know that it was a SynapseError
- # on the master process that we should send to the client. (And
- # importantly, not stack traces everywhere)
- raise e.to_synapse_error()
- defer.returnValue(result)
-
-
-class ReplicationSendEventRestServlet(RestServlet):
+class ReplicationSendEventRestServlet(ReplicationEndpoint):
"""Handles events newly created on workers, including persisting and
notifying.
The API looks like:
- POST /_synapse/replication/send_event/:event_id
+ POST /_synapse/replication/send_event/:event_id/:txn_id
{
"event": { .. serialized event .. },
@@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet):
"extra_users": [],
}
"""
- PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")]
+ NAME = "send_event"
+ PATH_ARGS = ("event_id",)
def __init__(self, hs):
- super(ReplicationSendEventRestServlet, self).__init__()
+ super(ReplicationSendEventRestServlet, self).__init__(hs)
self.event_creation_handler = hs.get_event_creation_handler()
self.store = hs.get_datastore()
self.clock = hs.get_clock()
- # The responses are tiny, so we may as well cache them for a while
- self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000)
+ @staticmethod
+ @defer.inlineCallbacks
+ def _serialize_payload(event_id, store, event, context, requester,
+ ratelimit, extra_users):
+ """
+ Args:
+ event_id (str)
+ store (DataStore)
+ requester (Requester)
+ event (FrozenEvent)
+ context (EventContext)
+ ratelimit (bool)
+ extra_users (list(UserID)): Any extra users to notify about event
+ """
+
+ serialized_context = yield context.serialize(event, store)
+
+ payload = {
+ "event": event.get_pdu_json(),
+ "internal_metadata": event.internal_metadata.get_dict(),
+ "rejected_reason": event.rejected_reason,
+ "context": serialized_context,
+ "requester": requester.serialize(),
+ "ratelimit": ratelimit,
+ "extra_users": [u.to_string() for u in extra_users],
+ }
- def on_PUT(self, request, event_id):
- return self.response_cache.wrap(
- event_id,
- self._handle_request,
- request
- )
+ defer.returnValue(payload)
@defer.inlineCallbacks
- def _handle_request(self, request):
+ def _handle_request(self, request, event_id):
with Measure(self.clock, "repl_send_event_parse"):
content = parse_json_object_from_request(request)
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index bdb5eee4af..4830c68f35 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -44,8 +44,8 @@ class SlavedEventStore(EventFederationWorkerStore,
RoomMemberWorkerStore,
EventPushActionsWorkerStore,
StreamWorkerStore,
- EventsWorkerStore,
StateGroupWorkerStore,
+ EventsWorkerStore,
SignatureWorkerStore,
UserErasureWorkerStore,
BaseSlavedStore):
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index b27b3ae144..17b14d464b 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -21,15 +21,17 @@ from six.moves import range
from twisted.internet import defer
+from synapse.api.constants import EventTypes
+from synapse.api.errors import NotFoundError
+from synapse.storage._base import SQLBaseStore
from synapse.storage.background_updates import BackgroundUpdateStore
from synapse.storage.engines import PostgresEngine
+from synapse.storage.events_worker import EventsWorkerStore
from synapse.util.caches import get_cache_factor_for, intern_string
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.dictionary_cache import DictionaryCache
from synapse.util.stringutils import to_ascii
-from ._base import SQLBaseStore
-
logger = logging.getLogger(__name__)
@@ -46,7 +48,8 @@ class _GetStateGroupDelta(namedtuple("_GetStateGroupDelta", ("prev_group", "delt
return len(self.delta_ids) if self.delta_ids else 0
-class StateGroupWorkerStore(SQLBaseStore):
+# this inherits from EventsWorkerStore because it calls self.get_events
+class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
"""The parts of StateGroupStore that can be called from workers.
"""
@@ -61,6 +64,30 @@ class StateGroupWorkerStore(SQLBaseStore):
"*stateGroupCache*", 500000 * get_cache_factor_for("stateGroupCache")
)
+ @defer.inlineCallbacks
+ def get_room_version(self, room_id):
+ """Get the room_version of a given room
+
+ Args:
+ room_id (str)
+
+ Returns:
+ Deferred[str]
+
+ Raises:
+ NotFoundError if the room is unknown
+ """
+ # for now we do this by looking at the create event. We may want to cache this
+ # more intelligently in future.
+ state_ids = yield self.get_current_state_ids(room_id)
+ create_id = state_ids.get((EventTypes.Create, ""))
+
+ if not create_id:
+ raise NotFoundError("Unknown room")
+
+ create_event = yield self.get_event(create_id)
+ defer.returnValue(create_event.content.get("room_version", "1"))
+
@cached(max_entries=100000, iterable=True)
def get_current_state_ids(self, room_id):
"""Get the current state event ids for a room based on the
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index 2c263af1a3..f422cf3c5a 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -48,7 +48,9 @@ def _expect_edu(destination, edu_type, content, origin="test"):
def _make_edu_json(origin, edu_type, content):
- return json.dumps(_expect_edu("test", edu_type, content, origin=origin))
+ return json.dumps(
+ _expect_edu("test", edu_type, content, origin=origin)
+ ).encode('utf8')
class TypingNotificationsTestCase(unittest.TestCase):
diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py
index 34e68ae82f..d46c27e7e9 100644
--- a/tests/rest/client/test_transactions.py
+++ b/tests/rest/client/test_transactions.py
@@ -85,7 +85,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
try:
yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e:
- self.assertEqual(e.message, "boo")
+ self.assertEqual(e.args[0], "boo")
self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
@@ -111,7 +111,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase):
try:
yield self.cache.fetch_or_execute(self.mock_key, cb)
except Exception as e:
- self.assertEqual(e.message, "boo")
+ self.assertEqual(e.args[0], "boo")
self.assertIs(LoggingContext.current_context(), test_context)
res = yield self.cache.fetch_or_execute(self.mock_key, cb)
diff --git a/tests/rest/client/v1/test_admin.py b/tests/rest/client/v1/test_admin.py
index 8c90145601..fb28883d30 100644
--- a/tests/rest/client/v1/test_admin.py
+++ b/tests/rest/client/v1/test_admin.py
@@ -140,7 +140,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True,
"mac": want_mac,
}
- ).encode('utf8')
+ )
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
@@ -168,7 +168,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True,
"mac": want_mac,
}
- ).encode('utf8')
+ )
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
@@ -195,7 +195,7 @@ class UserRegisterTestCase(unittest.TestCase):
"admin": True,
"mac": want_mac,
}
- ).encode('utf8')
+ )
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
@@ -253,7 +253,7 @@ class UserRegisterTestCase(unittest.TestCase):
self.assertEqual('Invalid username', channel.json_body["error"])
# Must not have null bytes
- body = json.dumps({"nonce": nonce(), "username": b"abcd\x00"})
+ body = json.dumps({"nonce": nonce(), "username": u"abcd\u0000"})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
@@ -289,7 +289,7 @@ class UserRegisterTestCase(unittest.TestCase):
self.assertEqual('Invalid password', channel.json_body["error"])
# Must not have null bytes
- body = json.dumps({"nonce": nonce(), "username": "a", "password": b"abcd\x00"})
+ body = json.dumps({"nonce": nonce(), "username": "a", "password": u"abcd\u0000"})
request, channel = make_request("POST", self.url, body.encode('utf8'))
render(request, self.resource, self.clock)
diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py
index d71cc8e0db..0516ce3cfb 100644
--- a/tests/rest/client/v1/test_profile.py
+++ b/tests/rest/client/v1/test_profile.py
@@ -80,7 +80,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger(
"PUT",
"/profile/%s/displayname" % (myid),
- '{"displayname": "Frank Jr."}'
+ b'{"displayname": "Frank Jr."}'
)
self.assertEquals(200, code)
@@ -95,7 +95,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger(
"PUT", "/profile/%s/displayname" % ("@4567:test"),
- '{"displayname": "Frank Jr."}'
+ b'{"displayname": "Frank Jr."}'
)
self.assertTrue(
@@ -122,7 +122,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger(
"PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"),
- '{"displayname":"bob"}'
+ b'{"displayname":"bob"}'
)
self.assertTrue(
@@ -151,7 +151,7 @@ class ProfileTestCase(unittest.TestCase):
(code, response) = yield self.mock_resource.trigger(
"PUT",
"/profile/%s/avatar_url" % (myid),
- '{"avatar_url": "http://my.server/pic.gif"}'
+ b'{"avatar_url": "http://my.server/pic.gif"}'
)
self.assertEquals(200, code)
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 41de8e0762..e3bc5f378d 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -105,7 +105,7 @@ class RestTestCase(unittest.TestCase):
"password": "test",
"type": "m.login.password"
}))
- self.assertEquals(200, code)
+ self.assertEquals(200, code, msg=response)
defer.returnValue(response)
@defer.inlineCallbacks
@@ -149,14 +149,14 @@ class RestHelper(object):
def create_room_as(self, room_creator, is_public=True, tok=None):
temp_id = self.auth_user_id
self.auth_user_id = room_creator
- path = b"/_matrix/client/r0/createRoom"
+ path = "/_matrix/client/r0/createRoom"
content = {}
if not is_public:
content["visibility"] = "private"
if tok:
- path = path + b"?access_token=%s" % tok.encode('ascii')
+ path = path + "?access_token=%s" % tok
- request, channel = make_request(b"POST", path, json.dumps(content).encode('utf8'))
+ request, channel = make_request("POST", path, json.dumps(content).encode('utf8'))
request.render(self.resource)
wait_until_result(self.hs.get_reactor(), channel)
@@ -205,7 +205,7 @@ class RestHelper(object):
data = {"membership": membership}
request, channel = make_request(
- b"PUT", path.encode('ascii'), json.dumps(data).encode('utf8')
+ "PUT", path, json.dumps(data).encode('utf8')
)
request.render(self.resource)
diff --git a/tests/rest/client/v2_alpha/test_filter.py b/tests/rest/client/v2_alpha/test_filter.py
index e890f0feac..de33b10a5f 100644
--- a/tests/rest/client/v2_alpha/test_filter.py
+++ b/tests/rest/client/v2_alpha/test_filter.py
@@ -33,7 +33,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha"
class FilterTestCase(unittest.TestCase):
- USER_ID = b"@apple:test"
+ USER_ID = "@apple:test"
EXAMPLE_FILTER = {"room": {"timeline": {"types": ["m.room.message"]}}}
EXAMPLE_FILTER_JSON = b'{"room": {"timeline": {"types": ["m.room.message"]}}}'
TO_REGISTER = [filter]
@@ -72,8 +72,8 @@ class FilterTestCase(unittest.TestCase):
def test_add_filter(self):
request, channel = make_request(
- b"POST",
- b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
+ "POST",
+ "/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
self.EXAMPLE_FILTER_JSON,
)
request.render(self.resource)
@@ -87,8 +87,8 @@ class FilterTestCase(unittest.TestCase):
def test_add_filter_for_other_user(self):
request, channel = make_request(
- b"POST",
- b"/_matrix/client/r0/user/%s/filter" % (b"@watermelon:test"),
+ "POST",
+ "/_matrix/client/r0/user/%s/filter" % ("@watermelon:test"),
self.EXAMPLE_FILTER_JSON,
)
request.render(self.resource)
@@ -101,8 +101,8 @@ class FilterTestCase(unittest.TestCase):
_is_mine = self.hs.is_mine
self.hs.is_mine = lambda target_user: False
request, channel = make_request(
- b"POST",
- b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
+ "POST",
+ "/_matrix/client/r0/user/%s/filter" % (self.USER_ID),
self.EXAMPLE_FILTER_JSON,
)
request.render(self.resource)
@@ -119,7 +119,7 @@ class FilterTestCase(unittest.TestCase):
self.clock.advance(1)
filter_id = filter_id.result
request, channel = make_request(
- b"GET", b"/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id)
+ "GET", "/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id)
)
request.render(self.resource)
wait_until_result(self.clock, channel)
@@ -129,7 +129,7 @@ class FilterTestCase(unittest.TestCase):
def test_get_filter_non_existant(self):
request, channel = make_request(
- b"GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID)
+ "GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID)
)
request.render(self.resource)
wait_until_result(self.clock, channel)
@@ -141,7 +141,7 @@ class FilterTestCase(unittest.TestCase):
# in errors.py
def test_get_filter_invalid_id(self):
request, channel = make_request(
- b"GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID)
+ "GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID)
)
request.render(self.resource)
wait_until_result(self.clock, channel)
@@ -151,7 +151,7 @@ class FilterTestCase(unittest.TestCase):
# No ID also returns an invalid_id error
def test_get_filter_no_id(self):
request, channel = make_request(
- b"GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID)
+ "GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID)
)
request.render(self.resource)
wait_until_result(self.clock, channel)
diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py
index e004d8fc73..f6293f11a8 100644
--- a/tests/rest/client/v2_alpha/test_register.py
+++ b/tests/rest/client/v2_alpha/test_register.py
@@ -81,7 +81,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"access_token": token,
"home_server": self.hs.hostname,
}
- self.assertDictContainsSubset(det_data, json.loads(channel.result["body"]))
+ self.assertDictContainsSubset(det_data, channel.json_body)
def test_POST_appservice_registration_invalid(self):
self.appservice = None # no application service exists
@@ -102,7 +102,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"400", channel.result)
self.assertEquals(
- json.loads(channel.result["body"])["error"], "Invalid password"
+ channel.json_body["error"], "Invalid password"
)
def test_POST_bad_username(self):
@@ -113,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"400", channel.result)
self.assertEquals(
- json.loads(channel.result["body"])["error"], "Invalid username"
+ channel.json_body["error"], "Invalid username"
)
def test_POST_user_valid(self):
@@ -140,7 +140,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"device_id": device_id,
}
self.assertEquals(channel.result["code"], b"200", channel.result)
- self.assertDictContainsSubset(det_data, json.loads(channel.result["body"]))
+ self.assertDictContainsSubset(det_data, channel.json_body)
self.auth_handler.get_login_tuple_for_user_id(
user_id, device_id=device_id, initial_device_display_name=None
)
@@ -158,7 +158,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"403", channel.result)
self.assertEquals(
- json.loads(channel.result["body"])["error"],
+ channel.json_body["error"],
"Registration has been disabled",
)
@@ -178,7 +178,7 @@ class RegisterRestServletTestCase(unittest.TestCase):
"device_id": "guest_device",
}
self.assertEquals(channel.result["code"], b"200", channel.result)
- self.assertDictContainsSubset(det_data, json.loads(channel.result["body"]))
+ self.assertDictContainsSubset(det_data, channel.json_body)
def test_POST_disabled_guest_registration(self):
self.hs.config.allow_guest_access = False
@@ -189,5 +189,5 @@ class RegisterRestServletTestCase(unittest.TestCase):
self.assertEquals(channel.result["code"], b"403", channel.result)
self.assertEquals(
- json.loads(channel.result["body"])["error"], "Guest access is disabled"
+ channel.json_body["error"], "Guest access is disabled"
)
diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py
index 03ec3993b2..bafc0d1df0 100644
--- a/tests/rest/client/v2_alpha/test_sync.py
+++ b/tests/rest/client/v2_alpha/test_sync.py
@@ -32,7 +32,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha"
class FilterTestCase(unittest.TestCase):
- USER_ID = b"@apple:test"
+ USER_ID = "@apple:test"
TO_REGISTER = [sync]
def setUp(self):
@@ -68,7 +68,7 @@ class FilterTestCase(unittest.TestCase):
r.register_servlets(self.hs, self.resource)
def test_sync_argless(self):
- request, channel = make_request(b"GET", b"/_matrix/client/r0/sync")
+ request, channel = make_request("GET", "/_matrix/client/r0/sync")
request.render(self.resource)
wait_until_result(self.clock, channel)
diff --git a/tests/server.py b/tests/server.py
index c611dd6059..e249668d21 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -11,6 +11,7 @@ from twisted.python.failure import Failure
from twisted.test.proto_helpers import MemoryReactorClock
from synapse.http.site import SynapseRequest
+from synapse.util import Clock
from tests.utils import setup_test_homeserver as _sth
@@ -28,7 +29,13 @@ class FakeChannel(object):
def json_body(self):
if not self.result:
raise Exception("No result yet.")
- return json.loads(self.result["body"])
+ return json.loads(self.result["body"].decode('utf8'))
+
+ @property
+ def code(self):
+ if not self.result:
+ raise Exception("No result yet.")
+ return int(self.result["code"])
def writeHeaders(self, version, code, reason, headers):
self.result["version"] = version
@@ -79,11 +86,16 @@ def make_request(method, path, content=b""):
Make a web request using the given method and path, feed it the
content, and return the Request and the Channel underneath.
"""
+ if not isinstance(method, bytes):
+ method = method.encode('ascii')
+
+ if not isinstance(path, bytes):
+ path = path.encode('ascii')
# Decorate it to be the full path
if not path.startswith(b"/_matrix"):
path = b"/_matrix/client/r0/" + path
- path = path.replace("//", "/")
+ path = path.replace(b"//", b"/")
if isinstance(content, text_type):
content = content.encode('utf8')
@@ -191,3 +203,9 @@ def setup_test_homeserver(*args, **kwargs):
clock.threadpool = ThreadPool()
pool.threadpool = ThreadPool()
return d
+
+
+def get_clock():
+ clock = ThreadedMemoryReactorClock()
+ hs_clock = Clock(clock)
+ return (clock, hs_clock)
diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py
index 30683e7888..69412c5aad 100644
--- a/tests/storage/test_event_federation.py
+++ b/tests/storage/test_event_federation.py
@@ -49,7 +49,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase):
'INSERT INTO event_reference_hashes '
'(event_id, algorithm, hash) '
"VALUES (?, 'sha256', ?)"
- ), (event_id, 'ffff'))
+ ), (event_id, b'ffff'))
for i in range(0, 11):
yield self.store.runInteraction("insert", insert_event, i)
diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py
index 7a76d67b8c..f7871cd426 100644
--- a/tests/storage/test_state.py
+++ b/tests/storage/test_state.py
@@ -176,7 +176,7 @@ class StateStoreTestCase(tests.unittest.TestCase):
room_id = self.room.to_string()
group_ids = yield self.store.get_state_groups_ids(room_id, [e5.event_id])
- group = group_ids.keys()[0]
+ group = list(group_ids.keys())[0]
# test _get_some_state_from_cache correctly filters out members with types=[]
(state_dict, is_all) = yield self.store._get_some_state_from_cache(
diff --git a/tests/test_server.py b/tests/test_server.py
index 7e063c0290..fc396226ea 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -1,4 +1,3 @@
-import json
import re
from twisted.internet.defer import Deferred
@@ -104,9 +103,8 @@ class JsonResourceTests(unittest.TestCase):
request.render(res)
self.assertEqual(channel.result["code"], b'403')
- reply_body = json.loads(channel.result["body"])
- self.assertEqual(reply_body["error"], "Forbidden!!one!")
- self.assertEqual(reply_body["errcode"], "M_FORBIDDEN")
+ self.assertEqual(channel.json_body["error"], "Forbidden!!one!")
+ self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN")
def test_no_handler(self):
"""
@@ -126,6 +124,5 @@ class JsonResourceTests(unittest.TestCase):
request.render(res)
self.assertEqual(channel.result["code"], b'400')
- reply_body = json.loads(channel.result["body"])
- self.assertEqual(reply_body["error"], "Unrecognized request")
- self.assertEqual(reply_body["errcode"], "M_UNRECOGNIZED")
+ self.assertEqual(channel.json_body["error"], "Unrecognized request")
+ self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED")
diff --git a/tests/utils.py b/tests/utils.py
index e7894819c0..5d49692c58 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -77,6 +77,10 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
config.max_mau_value = 50
config.mau_limits_reserved_threepids = []
+ # we need a sane default_room_version, otherwise attempts to create rooms will
+ # fail.
+ config.default_room_version = "1"
+
# disable user directory updates, because they get done in the
# background, which upsets the test runner.
config.update_user_directory = False
@@ -149,8 +153,9 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None
# Need to let the HS build an auth handler and then mess with it
# because AuthHandler's constructor requires the HS, so we can't make one
# beforehand and pass it in to the HS's constructor (chicken / egg)
- hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest()
- hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h
+ hs.get_auth_handler().hash = lambda p: hashlib.md5(p.encode('utf8')).hexdigest()
+ hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(
+ p.encode('utf8')).hexdigest() == h
fed = kargs.get("resource_for_federation", None)
if fed:
@@ -223,8 +228,8 @@ class MockHttpResource(HttpServer):
mock_content.configure_mock(**config)
mock_request.content = mock_content
- mock_request.method = http_method
- mock_request.uri = path
+ mock_request.method = http_method.encode('ascii')
+ mock_request.uri = path.encode('ascii')
mock_request.getClientIP.return_value = "-"
|