diff options
85 files changed, 575 insertions, 383 deletions
diff --git a/CHANGES.md b/CHANGES.md index a4c3ce31ae..a299110a6b 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,76 @@ +Synapse 0.33.2 (2018-08-09) +=========================== + +No significant changes. + + +Synapse 0.33.2rc1 (2018-08-07) +============================== + +Features +-------- + +- add support for the lazy_loaded_members filter as per MSC1227 ([\#2970](https://github.com/matrix-org/synapse/issues/2970)) +- add support for the include_redundant_members filter param as per MSC1227 ([\#3331](https://github.com/matrix-org/synapse/issues/3331)) +- Add metrics to track resource usage by background processes ([\#3553](https://github.com/matrix-org/synapse/issues/3553), [\#3556](https://github.com/matrix-org/synapse/issues/3556), [\#3604](https://github.com/matrix-org/synapse/issues/3604), [\#3610](https://github.com/matrix-org/synapse/issues/3610)) +- Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric ([\#3554](https://github.com/matrix-org/synapse/issues/3554)) +- Add support for client_reader to handle more APIs ([\#3555](https://github.com/matrix-org/synapse/issues/3555), [\#3597](https://github.com/matrix-org/synapse/issues/3597)) +- make the /context API filter & lazy-load aware as per MSC1227 ([\#3567](https://github.com/matrix-org/synapse/issues/3567)) +- Add ability to limit number of monthly active users on the server ([\#3630](https://github.com/matrix-org/synapse/issues/3630)) +- When we fail to join a room over federation, pass the error code back to the client. ([\#3639](https://github.com/matrix-org/synapse/issues/3639)) +- Add a new /admin/register API for non-interactively creating users. ([\#3415](https://github.com/matrix-org/synapse/issues/3415)) + + +Bugfixes +-------- + +- Make /directory/list API return 404 for room not found instead of 400 ([\#2952](https://github.com/matrix-org/synapse/issues/2952)) +- Default inviter_display_name to mxid for email invites ([\#3391](https://github.com/matrix-org/synapse/issues/3391)) +- Don't generate TURN credentials if no TURN config options are set ([\#3514](https://github.com/matrix-org/synapse/issues/3514)) +- Correctly announce deleted devices over federation ([\#3520](https://github.com/matrix-org/synapse/issues/3520)) +- Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis. ([\#3548](https://github.com/matrix-org/synapse/issues/3548)) +- Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password. ([\#3569](https://github.com/matrix-org/synapse/issues/3569)) +- Fix potential stack overflow and deadlock under heavy load ([\#3570](https://github.com/matrix-org/synapse/issues/3570)) +- Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585 ([\#3585](https://github.com/matrix-org/synapse/issues/3585)) +- Fix failure to persist events over federation under load ([\#3601](https://github.com/matrix-org/synapse/issues/3601)) +- Fix updating of cached remote profiles ([\#3605](https://github.com/matrix-org/synapse/issues/3605)) +- Fix 'tuple index out of range' error ([\#3607](https://github.com/matrix-org/synapse/issues/3607)) +- Only import secrets when available (fix for py < 3.6) ([\#3626](https://github.com/matrix-org/synapse/issues/3626)) + + +Internal Changes +---------------- + +- Remove redundant checks on who_forgot_in_room ([\#3350](https://github.com/matrix-org/synapse/issues/3350)) +- Remove unnecessary event re-signing hacks ([\#3367](https://github.com/matrix-org/synapse/issues/3367)) +- Rewrite cache list decorator ([\#3384](https://github.com/matrix-org/synapse/issues/3384)) +- Move v1-only REST APIs into their own module. ([\#3460](https://github.com/matrix-org/synapse/issues/3460)) +- Replace more instances of Python 2-only iteritems and itervalues uses. ([\#3562](https://github.com/matrix-org/synapse/issues/3562)) +- Refactor EventContext to accept state during init ([\#3577](https://github.com/matrix-org/synapse/issues/3577)) +- Improve Dockerfile and docker-compose instructions ([\#3543](https://github.com/matrix-org/synapse/issues/3543)) +- Release notes are now in the Markdown format. ([\#3552](https://github.com/matrix-org/synapse/issues/3552)) +- add config for pep8 ([\#3559](https://github.com/matrix-org/synapse/issues/3559)) +- Merge Linearizer and Limiter ([\#3571](https://github.com/matrix-org/synapse/issues/3571), [\#3572](https://github.com/matrix-org/synapse/issues/3572)) +- Lazily load state on master process when using workers to reduce DB consumption ([\#3579](https://github.com/matrix-org/synapse/issues/3579), [\#3581](https://github.com/matrix-org/synapse/issues/3581), [\#3582](https://github.com/matrix-org/synapse/issues/3582), [\#3584](https://github.com/matrix-org/synapse/issues/3584)) +- Fixes and optimisations for resolve_state_groups ([\#3586](https://github.com/matrix-org/synapse/issues/3586)) +- Improve logging for exceptions when handling PDUs ([\#3587](https://github.com/matrix-org/synapse/issues/3587)) +- Add some measure blocks to persist_events ([\#3590](https://github.com/matrix-org/synapse/issues/3590)) +- Fix some random logcontext leaks. ([\#3591](https://github.com/matrix-org/synapse/issues/3591), [\#3606](https://github.com/matrix-org/synapse/issues/3606)) +- Speed up calculating state deltas in persist_event loop ([\#3592](https://github.com/matrix-org/synapse/issues/3592)) +- Attempt to reduce amount of state pulled out of DB during persist_events ([\#3595](https://github.com/matrix-org/synapse/issues/3595)) +- Fix a documentation typo in on_make_leave_request ([\#3609](https://github.com/matrix-org/synapse/issues/3609)) +- Make EventStore inherit from EventFederationStore ([\#3612](https://github.com/matrix-org/synapse/issues/3612)) +- Remove some redundant joins on event_edges.room_id ([\#3613](https://github.com/matrix-org/synapse/issues/3613)) +- Stop populating events.content ([\#3614](https://github.com/matrix-org/synapse/issues/3614)) +- Update the /send_leave path registration to use event_id rather than a transaction ID. ([\#3616](https://github.com/matrix-org/synapse/issues/3616)) +- Refactor FederationHandler to move DB writes into separate functions ([\#3621](https://github.com/matrix-org/synapse/issues/3621)) +- Remove unused field "pdu_failures" from transactions. ([\#3628](https://github.com/matrix-org/synapse/issues/3628)) +- rename replication_layer to federation_client ([\#3634](https://github.com/matrix-org/synapse/issues/3634)) +- Factor out exception handling in federation_client ([\#3638](https://github.com/matrix-org/synapse/issues/3638)) +- Refactor location of docker build script. ([\#3644](https://github.com/matrix-org/synapse/issues/3644)) +- Update CONTRIBUTING to mention newsfragments. ([\#3645](https://github.com/matrix-org/synapse/issues/3645)) + + Synapse 0.33.1 (2018-08-02) =========================== diff --git a/changelog.d/2952.bugfix b/changelog.d/2952.bugfix deleted file mode 100644 index 07a3e48304..0000000000 --- a/changelog.d/2952.bugfix +++ /dev/null @@ -1 +0,0 @@ -Make /directory/list API return 404 for room not found instead of 400 diff --git a/changelog.d/2970.feature b/changelog.d/2970.feature deleted file mode 100644 index 5eb928563f..0000000000 --- a/changelog.d/2970.feature +++ /dev/null @@ -1 +0,0 @@ -add support for the lazy_loaded_members filter as per MSC1227 diff --git a/changelog.d/3331.feature b/changelog.d/3331.feature deleted file mode 100644 index e574b9bcc3..0000000000 --- a/changelog.d/3331.feature +++ /dev/null @@ -1 +0,0 @@ -add support for the include_redundant_members filter param as per MSC1227 diff --git a/changelog.d/3350.misc b/changelog.d/3350.misc deleted file mode 100644 index 3713cd6d63..0000000000 --- a/changelog.d/3350.misc +++ /dev/null @@ -1 +0,0 @@ -Remove redundant checks on who_forgot_in_room \ No newline at end of file diff --git a/changelog.d/3367.misc b/changelog.d/3367.misc deleted file mode 100644 index 1f21ddea48..0000000000 --- a/changelog.d/3367.misc +++ /dev/null @@ -1 +0,0 @@ -Remove unnecessary event re-signing hacks \ No newline at end of file diff --git a/changelog.d/3384.misc b/changelog.d/3384.misc deleted file mode 100644 index 5d56c876d9..0000000000 --- a/changelog.d/3384.misc +++ /dev/null @@ -1 +0,0 @@ -Rewrite cache list decorator diff --git a/changelog.d/3391.bugfix b/changelog.d/3391.bugfix deleted file mode 100644 index 88eeb50df2..0000000000 --- a/changelog.d/3391.bugfix +++ /dev/null @@ -1 +0,0 @@ -Default inviter_display_name to mxid for email invites \ No newline at end of file diff --git a/changelog.d/3415.misc b/changelog.d/3415.misc deleted file mode 100644 index e69de29bb2..0000000000 --- a/changelog.d/3415.misc +++ /dev/null diff --git a/changelog.d/3460.misc b/changelog.d/3460.misc deleted file mode 100644 index e69de29bb2..0000000000 --- a/changelog.d/3460.misc +++ /dev/null diff --git a/changelog.d/3514.bugfix b/changelog.d/3514.bugfix deleted file mode 100644 index 460fe24ac9..0000000000 --- a/changelog.d/3514.bugfix +++ /dev/null @@ -1 +0,0 @@ -Don't generate TURN credentials if no TURN config options are set diff --git a/changelog.d/3520.bugfix b/changelog.d/3520.bugfix deleted file mode 100644 index 9278cb3708..0000000000 --- a/changelog.d/3520.bugfix +++ /dev/null @@ -1 +0,0 @@ -Correctly announce deleted devices over federation diff --git a/changelog.d/3543.misc b/changelog.d/3543.misc deleted file mode 100644 index d231d17749..0000000000 --- a/changelog.d/3543.misc +++ /dev/null @@ -1 +0,0 @@ -Improve Dockerfile and docker-compose instructions diff --git a/changelog.d/3548.bugfix b/changelog.d/3548.bugfix deleted file mode 100644 index 38dc3b1232..0000000000 --- a/changelog.d/3548.bugfix +++ /dev/null @@ -1 +0,0 @@ -Catch failures saving metrics captured by Measure, and instead log the faulty metrics information for further analysis. diff --git a/changelog.d/3552.misc b/changelog.d/3552.misc deleted file mode 100644 index 709c3282b4..0000000000 --- a/changelog.d/3552.misc +++ /dev/null @@ -1 +0,0 @@ -Release notes are now in the Markdown format. diff --git a/changelog.d/3553.feature b/changelog.d/3553.feature deleted file mode 100644 index 77a294cb9f..0000000000 --- a/changelog.d/3553.feature +++ /dev/null @@ -1 +0,0 @@ -Add metrics to track resource usage by background processes diff --git a/changelog.d/3554.feature b/changelog.d/3554.feature deleted file mode 100644 index b00397872c..0000000000 --- a/changelog.d/3554.feature +++ /dev/null @@ -1 +0,0 @@ -Add `code` label to `synapse_http_server_response_time_seconds` prometheus metric diff --git a/changelog.d/3555.feature b/changelog.d/3555.feature deleted file mode 100644 index ea4a85e0ae..0000000000 --- a/changelog.d/3555.feature +++ /dev/null @@ -1 +0,0 @@ -Add support for client_reader to handle more APIs diff --git a/changelog.d/3556.feature b/changelog.d/3556.feature deleted file mode 100644 index 77a294cb9f..0000000000 --- a/changelog.d/3556.feature +++ /dev/null @@ -1 +0,0 @@ -Add metrics to track resource usage by background processes diff --git a/changelog.d/3559.misc b/changelog.d/3559.misc deleted file mode 100644 index 26df859e45..0000000000 --- a/changelog.d/3559.misc +++ /dev/null @@ -1 +0,0 @@ -add config for pep8 diff --git a/changelog.d/3562.misc b/changelog.d/3562.misc deleted file mode 100644 index e69de29bb2..0000000000 --- a/changelog.d/3562.misc +++ /dev/null diff --git a/changelog.d/3567.feature b/changelog.d/3567.feature deleted file mode 100644 index c74c1f57a9..0000000000 --- a/changelog.d/3567.feature +++ /dev/null @@ -1 +0,0 @@ -make the /context API filter & lazy-load aware as per MSC1227 diff --git a/changelog.d/3569.bugfix b/changelog.d/3569.bugfix deleted file mode 100644 index d77f035ee0..0000000000 --- a/changelog.d/3569.bugfix +++ /dev/null @@ -1 +0,0 @@ -Unicode passwords are now normalised before hashing, preventing the instance where two different devices or browsers might send a different UTF-8 sequence for the password. diff --git a/changelog.d/3570.bugfix b/changelog.d/3570.bugfix deleted file mode 100644 index cec5158a99..0000000000 --- a/changelog.d/3570.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix potential stack overflow and deadlock under heavy load \ No newline at end of file diff --git a/changelog.d/3571.misc b/changelog.d/3571.misc deleted file mode 100644 index 8908324e68..0000000000 --- a/changelog.d/3571.misc +++ /dev/null @@ -1 +0,0 @@ -Merge Linearizer and Limiter diff --git a/changelog.d/3572.misc b/changelog.d/3572.misc deleted file mode 100644 index 8908324e68..0000000000 --- a/changelog.d/3572.misc +++ /dev/null @@ -1 +0,0 @@ -Merge Linearizer and Limiter diff --git a/changelog.d/3577.misc b/changelog.d/3577.misc deleted file mode 100644 index e69de29bb2..0000000000 --- a/changelog.d/3577.misc +++ /dev/null diff --git a/changelog.d/3579.misc b/changelog.d/3579.misc deleted file mode 100644 index 2374dc0c44..0000000000 --- a/changelog.d/3579.misc +++ /dev/null @@ -1 +0,0 @@ -Lazily load state on master process when using workers to reduce DB consumption diff --git a/changelog.d/3581.misc b/changelog.d/3581.misc deleted file mode 100644 index 2374dc0c44..0000000000 --- a/changelog.d/3581.misc +++ /dev/null @@ -1 +0,0 @@ -Lazily load state on master process when using workers to reduce DB consumption diff --git a/changelog.d/3582.misc b/changelog.d/3582.misc deleted file mode 100644 index 2374dc0c44..0000000000 --- a/changelog.d/3582.misc +++ /dev/null @@ -1 +0,0 @@ -Lazily load state on master process when using workers to reduce DB consumption diff --git a/changelog.d/3584.misc b/changelog.d/3584.misc deleted file mode 100644 index 2374dc0c44..0000000000 --- a/changelog.d/3584.misc +++ /dev/null @@ -1 +0,0 @@ -Lazily load state on master process when using workers to reduce DB consumption diff --git a/changelog.d/3585.bugfix b/changelog.d/3585.bugfix deleted file mode 100644 index e8ae1d8cb4..0000000000 --- a/changelog.d/3585.bugfix +++ /dev/null @@ -1 +0,0 @@ -Respond with M_NOT_FOUND when profiles are not found locally or over federation. Fixes #3585 diff --git a/changelog.d/3586.misc b/changelog.d/3586.misc deleted file mode 100644 index e853e2481b..0000000000 --- a/changelog.d/3586.misc +++ /dev/null @@ -1 +0,0 @@ -Fixes and optimisations for resolve_state_groups diff --git a/changelog.d/3587.misc b/changelog.d/3587.misc deleted file mode 100644 index 75a3479910..0000000000 --- a/changelog.d/3587.misc +++ /dev/null @@ -1 +0,0 @@ -Improve logging for exceptions when handling PDUs \ No newline at end of file diff --git a/changelog.d/3590.misc b/changelog.d/3590.misc deleted file mode 100644 index 0f1688fd0f..0000000000 --- a/changelog.d/3590.misc +++ /dev/null @@ -1 +0,0 @@ -Add some measure blocks to persist_events diff --git a/changelog.d/3591.misc b/changelog.d/3591.misc deleted file mode 100644 index f0137766a0..0000000000 --- a/changelog.d/3591.misc +++ /dev/null @@ -1 +0,0 @@ -Fix some random logcontext leaks. \ No newline at end of file diff --git a/changelog.d/3592.misc b/changelog.d/3592.misc deleted file mode 100644 index 60129569c2..0000000000 --- a/changelog.d/3592.misc +++ /dev/null @@ -1 +0,0 @@ -Speed up calculating state deltas in persist_event loop diff --git a/changelog.d/3595.misc b/changelog.d/3595.misc deleted file mode 100644 index 85903504cc..0000000000 --- a/changelog.d/3595.misc +++ /dev/null @@ -1 +0,0 @@ -Attempt to reduce amount of state pulled out of DB during persist_events diff --git a/changelog.d/3597.feature b/changelog.d/3597.feature deleted file mode 100644 index ea4a85e0ae..0000000000 --- a/changelog.d/3597.feature +++ /dev/null @@ -1 +0,0 @@ -Add support for client_reader to handle more APIs diff --git a/changelog.d/3601.bugfix b/changelog.d/3601.bugfix deleted file mode 100644 index 1678b261d0..0000000000 --- a/changelog.d/3601.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix failure to persist events over federation under load diff --git a/changelog.d/3604.feature b/changelog.d/3604.feature deleted file mode 100644 index 77a294cb9f..0000000000 --- a/changelog.d/3604.feature +++ /dev/null @@ -1 +0,0 @@ -Add metrics to track resource usage by background processes diff --git a/changelog.d/3605.bugfix b/changelog.d/3605.bugfix deleted file mode 100644 index 786da546eb..0000000000 --- a/changelog.d/3605.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix updating of cached remote profiles diff --git a/changelog.d/3606.misc b/changelog.d/3606.misc deleted file mode 100644 index f0137766a0..0000000000 --- a/changelog.d/3606.misc +++ /dev/null @@ -1 +0,0 @@ -Fix some random logcontext leaks. \ No newline at end of file diff --git a/changelog.d/3607.bugfix b/changelog.d/3607.bugfix deleted file mode 100644 index 7ad64593b8..0000000000 --- a/changelog.d/3607.bugfix +++ /dev/null @@ -1 +0,0 @@ -Fix 'tuple index out of range' error \ No newline at end of file diff --git a/changelog.d/3609.misc b/changelog.d/3609.misc deleted file mode 100644 index 5b9566d076..0000000000 --- a/changelog.d/3609.misc +++ /dev/null @@ -1 +0,0 @@ -Fix a documentation typo in on_make_leave_request diff --git a/changelog.d/3610.feature b/changelog.d/3610.feature deleted file mode 100644 index 77a294cb9f..0000000000 --- a/changelog.d/3610.feature +++ /dev/null @@ -1 +0,0 @@ -Add metrics to track resource usage by background processes diff --git a/changelog.d/3612.misc b/changelog.d/3612.misc deleted file mode 100644 index f90d2f2ff5..0000000000 --- a/changelog.d/3612.misc +++ /dev/null @@ -1 +0,0 @@ -Make EventStore inherit from EventFederationStore diff --git a/changelog.d/3613.misc b/changelog.d/3613.misc deleted file mode 100644 index d9378f6b49..0000000000 --- a/changelog.d/3613.misc +++ /dev/null @@ -1 +0,0 @@ -Remove some redundant joins on event_edges.room_id diff --git a/changelog.d/3614.misc b/changelog.d/3614.misc deleted file mode 100644 index 356f28471e..0000000000 --- a/changelog.d/3614.misc +++ /dev/null @@ -1 +0,0 @@ -Stop populating events.content diff --git a/changelog.d/3616.misc b/changelog.d/3616.misc deleted file mode 100644 index 04629faa36..0000000000 --- a/changelog.d/3616.misc +++ /dev/null @@ -1 +0,0 @@ -Update the /send_leave path registration to use event_id rather than a transaction ID. diff --git a/changelog.d/3621.misc b/changelog.d/3621.misc deleted file mode 100644 index 83e5fc8aa1..0000000000 --- a/changelog.d/3621.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor FederationHandler to move DB writes into separate functions diff --git a/changelog.d/3626.bugfix b/changelog.d/3626.bugfix deleted file mode 100644 index 9a4b878986..0000000000 --- a/changelog.d/3626.bugfix +++ /dev/null @@ -1 +0,0 @@ -Only import secrets when available (fix for py < 3.6) diff --git a/changelog.d/3628.misc b/changelog.d/3628.misc deleted file mode 100644 index 1aebefbe18..0000000000 --- a/changelog.d/3628.misc +++ /dev/null @@ -1 +0,0 @@ -Remove unused field "pdu_failures" from transactions. diff --git a/changelog.d/3630.feature b/changelog.d/3630.feature deleted file mode 100644 index 8007a04840..0000000000 --- a/changelog.d/3630.feature +++ /dev/null @@ -1 +0,0 @@ -Add ability to limit number of monthly active users on the server diff --git a/changelog.d/3632.misc b/changelog.d/3632.misc new file mode 100644 index 0000000000..9d64bbe83b --- /dev/null +++ b/changelog.d/3632.misc @@ -0,0 +1 @@ +Refactor HTTP replication endpoints to reduce code duplication diff --git a/changelog.d/3634.misc b/changelog.d/3634.misc deleted file mode 100644 index 2cd6af91ff..0000000000 --- a/changelog.d/3634.misc +++ /dev/null @@ -1 +0,0 @@ -rename replication_layer to federation_client diff --git a/changelog.d/3638.misc b/changelog.d/3638.misc deleted file mode 100644 index 9faab15cf2..0000000000 --- a/changelog.d/3638.misc +++ /dev/null @@ -1 +0,0 @@ -Factor out exception handling in federation_client diff --git a/changelog.d/3639.feature b/changelog.d/3639.feature deleted file mode 100644 index c8c387e219..0000000000 --- a/changelog.d/3639.feature +++ /dev/null @@ -1 +0,0 @@ -When we fail to join a room over federation, pass the error code back to the client. \ No newline at end of file diff --git a/changelog.d/3644.misc b/changelog.d/3644.misc deleted file mode 100644 index 2347fc8500..0000000000 --- a/changelog.d/3644.misc +++ /dev/null @@ -1 +0,0 @@ -Refactor location of docker build script. diff --git a/changelog.d/3645.misc b/changelog.d/3645.misc deleted file mode 100644 index 0fe6b28da1..0000000000 --- a/changelog.d/3645.misc +++ /dev/null @@ -1 +0,0 @@ -Update CONTRIBUTING to mention newsfragments. diff --git a/changelog.d/3647.misc b/changelog.d/3647.misc new file mode 100644 index 0000000000..dbc66dae60 --- /dev/null +++ b/changelog.d/3647.misc @@ -0,0 +1 @@ +Tests now correctly execute on Python 3. diff --git a/changelog.d/3664.feature b/changelog.d/3664.feature new file mode 100644 index 0000000000..184dde9939 --- /dev/null +++ b/changelog.d/3664.feature @@ -0,0 +1 @@ +Add some metrics for the appservice and federation event sending loops diff --git a/pyproject.toml b/pyproject.toml index f4d6f0c6bb..dd099dc9c8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -2,7 +2,7 @@ package = "synapse" filename = "CHANGES.md" directory = "changelog.d" - issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue}>)" + issue_format = "[\\#{issue}](https://github.com/matrix-org/synapse/issues/{issue})" [[tool.towncrier.type]] directory = "feature" diff --git a/synapse/__init__.py b/synapse/__init__.py index 1810cb6fcd..a14d578e36 100644 --- a/synapse/__init__.py +++ b/synapse/__init__.py @@ -17,4 +17,4 @@ """ This is a reference implementation of a Matrix home server. """ -__version__ = "0.33.1" +__version__ = "0.33.2" diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 78f9d40a3a..f603c8a368 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException from synapse.handlers.presence import format_user_presence_state, get_interested_remotes from synapse.metrics import ( LaterGauge, + event_processing_loop_counter, + event_processing_loop_room_count, events_processed_counter, sent_edus_counter, sent_transactions_counter, @@ -253,7 +255,13 @@ class TransactionQueue(object): synapse.metrics.event_processing_last_ts.labels( "federation_sender").set(ts) - events_processed_counter.inc(len(events)) + events_processed_counter.inc(len(events)) + + event_processing_loop_room_count.labels( + "federation_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("federation_sender").inc() synapse.metrics.event_processing_positions.labels( "federation_sender").set(next_token) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ee41aed69e..f0f89af7dc 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,6 +23,10 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.metrics import ( + event_processing_loop_counter, + event_processing_loop_room_count, +) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure @@ -136,6 +140,12 @@ class ApplicationServicesHandler(object): events_processed_counter.inc(len(events)) + event_processing_loop_room_count.labels( + "appservice_sender" + ).inc(len(events_by_room)) + + event_processing_loop_counter.labels("appservice_sender").inc() + synapse.metrics.event_processing_lag.labels( "appservice_sender").set(now - ts) synapse.metrics.event_processing_last_ts.labels( diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 39d7724778..bcb093ba3e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -30,7 +30,7 @@ from synapse.api.urls import ConsentURIBuilder from synapse.crypto.event_signing import add_hashes_and_signatures from synapse.events.utils import serialize_event from synapse.events.validator import EventValidator -from synapse.replication.http.send_event import send_event_to_master +from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.types import RoomAlias, UserID from synapse.util.async import Linearizer from synapse.util.frozenutils import frozendict_json_encoder @@ -171,7 +171,7 @@ class EventCreationHandler(object): self.notifier = hs.get_notifier() self.config = hs.config - self.http_client = hs.get_simple_http_client() + self.send_event_to_master = ReplicationSendEventRestServlet.make_client(hs) # This is only used to get at ratelimit function, and maybe_kick_guest_users self.base_handler = BaseHandler(hs) @@ -559,12 +559,9 @@ class EventCreationHandler(object): try: # If we're a worker we need to hit out to the master. if self.config.worker_app: - yield send_event_to_master( - clock=self.hs.get_clock(), + yield self.send_event_to_master( + event_id=event.event_id, store=self.store, - client=self.http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, requester=requester, event=event, context=context, diff --git a/synapse/handlers/room_member_worker.py b/synapse/handlers/room_member_worker.py index 22d8b4b0d3..acc6eb8099 100644 --- a/synapse/handlers/room_member_worker.py +++ b/synapse/handlers/room_member_worker.py @@ -20,16 +20,24 @@ from twisted.internet import defer from synapse.api.errors import SynapseError from synapse.handlers.room_member import RoomMemberHandler from synapse.replication.http.membership import ( - get_or_register_3pid_guest, - notify_user_membership_change, - remote_join, - remote_reject_invite, + ReplicationRegister3PIDGuestRestServlet as Repl3PID, + ReplicationRemoteJoinRestServlet as ReplRemoteJoin, + ReplicationRemoteRejectInviteRestServlet as ReplRejectInvite, + ReplicationUserJoinedLeftRoomRestServlet as ReplJoinedLeft, ) logger = logging.getLogger(__name__) class RoomMemberWorkerHandler(RoomMemberHandler): + def __init__(self, hs): + super(RoomMemberWorkerHandler, self).__init__(hs) + + self._get_register_3pid_client = Repl3PID.make_client(hs) + self._remote_join_client = ReplRemoteJoin.make_client(hs) + self._remote_reject_client = ReplRejectInvite.make_client(hs) + self._notify_change_client = ReplJoinedLeft.make_client(hs) + @defer.inlineCallbacks def _remote_join(self, requester, remote_room_hosts, room_id, user, content): """Implements RoomMemberHandler._remote_join @@ -37,10 +45,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): if len(remote_room_hosts) == 0: raise SynapseError(404, "No known servers") - ret = yield remote_join( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + ret = yield self._remote_join_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -55,10 +60,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target): """Implements RoomMemberHandler._remote_reject_invite """ - return remote_reject_invite( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._remote_reject_client( requester=requester, remote_room_hosts=remote_room_hosts, room_id=room_id, @@ -68,10 +70,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_joined_room(self, target, room_id): """Implements RoomMemberHandler._user_joined_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="joined", @@ -80,10 +79,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def _user_left_room(self, target, room_id): """Implements RoomMemberHandler._user_left_room """ - return notify_user_membership_change( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._notify_change_client( user_id=target.to_string(), room_id=room_id, change="left", @@ -92,10 +88,7 @@ class RoomMemberWorkerHandler(RoomMemberHandler): def get_or_register_3pid_guest(self, requester, medium, address, inviter_user_id): """Implements RoomMemberHandler.get_or_register_3pid_guest """ - return get_or_register_3pid_guest( - self.simple_http_client, - host=self.config.worker_replication_host, - port=self.config.worker_replication_http_port, + return self._get_register_3pid_client( requester=requester, medium=medium, address=address, diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a9158fc066..550f8443f7 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions events_processed_counter = Counter("synapse_federation_client_events_processed", "") +event_processing_loop_counter = Counter( + "synapse_event_processing_loop_count", + "Event processing loop iterations", + ["name"], +) + +event_processing_loop_room_count = Counter( + "synapse_event_processing_loop_room_count", + "Rooms seen per event processing loop iteration", + ["name"], +) + + # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py new file mode 100644 index 0000000000..5e5376cf58 --- /dev/null +++ b/synapse/replication/http/_base.py @@ -0,0 +1,215 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import abc +import logging +import re + +from six.moves import urllib + +from twisted.internet import defer + +from synapse.api.errors import CodeMessageException, HttpResponseException +from synapse.util.caches.response_cache import ResponseCache +from synapse.util.stringutils import random_string + +logger = logging.getLogger(__name__) + + +class ReplicationEndpoint(object): + """Helper base class for defining new replication HTTP endpoints. + + This creates an endpoint under `/_synapse/replication/:NAME/:PATH_ARGS..` + (with an `/:txn_id` prefix for cached requests.), where NAME is a name, + PATH_ARGS are a tuple of parameters to be encoded in the URL. + + For example, if `NAME` is "send_event" and `PATH_ARGS` is `("event_id",)`, + with `CACHE` set to true then this generates an endpoint: + + /_synapse/replication/send_event/:event_id/:txn_id + + For POST/PUT requests the payload is serialized to json and sent as the + body, while for GET requests the payload is added as query parameters. See + `_serialize_payload` for details. + + Incoming requests are handled by overriding `_handle_request`. Servers + must call `register` to register the path with the HTTP server. + + Requests can be sent by calling the client returned by `make_client`. + + Attributes: + NAME (str): A name for the endpoint, added to the path as well as used + in logging and metrics. + PATH_ARGS (tuple[str]): A list of parameters to be added to the path. + Adding parameters to the path (rather than payload) can make it + easier to follow along in the log files. + METHOD (str): The method of the HTTP request, defaults to POST. Can be + one of POST, PUT or GET. If GET then the payload is sent as query + parameters rather than a JSON body. + CACHE (bool): Whether server should cache the result of the request/ + If true then transparently adds a txn_id to all requests, and + `_handle_request` must return a Deferred. + RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504 + is received. + """ + + __metaclass__ = abc.ABCMeta + + NAME = abc.abstractproperty() + PATH_ARGS = abc.abstractproperty() + + METHOD = "POST" + CACHE = True + RETRY_ON_TIMEOUT = True + + def __init__(self, hs): + if self.CACHE: + self.response_cache = ResponseCache( + hs, "repl." + self.NAME, + timeout_ms=30 * 60 * 1000, + ) + + assert self.METHOD in ("PUT", "POST", "GET") + + @abc.abstractmethod + def _serialize_payload(**kwargs): + """Static method that is called when creating a request. + + Concrete implementations should have explicit parameters (rather than + kwargs) so that an appropriate exception is raised if the client is + called with unexpected parameters. All PATH_ARGS must appear in + argument list. + + Returns: + Deferred[dict]|dict: If POST/PUT request then dictionary must be + JSON serialisable, otherwise must be appropriate for adding as + query args. + """ + return {} + + @abc.abstractmethod + def _handle_request(self, request, **kwargs): + """Handle incoming request. + + This is called with the request object and PATH_ARGS. + + Returns: + Deferred[dict]: A JSON serialisable dict to be used as response + body of request. + """ + pass + + @classmethod + def make_client(cls, hs): + """Create a client that makes requests. + + Returns a callable that accepts the same parameters as `_serialize_payload`. + """ + clock = hs.get_clock() + host = hs.config.worker_replication_host + port = hs.config.worker_replication_http_port + + client = hs.get_simple_http_client() + + @defer.inlineCallbacks + def send_request(**kwargs): + data = yield cls._serialize_payload(**kwargs) + + url_args = [urllib.parse.quote(kwargs[name]) for name in cls.PATH_ARGS] + + if cls.CACHE: + txn_id = random_string(10) + url_args.append(txn_id) + + if cls.METHOD == "POST": + request_func = client.post_json_get_json + elif cls.METHOD == "PUT": + request_func = client.put_json + elif cls.METHOD == "GET": + request_func = client.get_json + else: + # We have already asserted in the constructor that a + # compatible was picked, but lets be paranoid. + raise Exception( + "Unknown METHOD on %s replication endpoint" % (cls.NAME,) + ) + + uri = "http://%s:%s/_synapse/replication/%s/%s" % ( + host, port, cls.NAME, "/".join(url_args) + ) + + try: + # We keep retrying the same request for timeouts. This is so that we + # have a good idea that the request has either succeeded or failed on + # the master, and so whether we should clean up or not. + while True: + try: + result = yield request_func(uri, data) + break + except CodeMessageException as e: + if e.code != 504 or not cls.RETRY_ON_TIMEOUT: + raise + + logger.warn("%s request timed out", cls.NAME) + + # If we timed out we probably don't need to worry about backing + # off too much, but lets just wait a little anyway. + yield clock.sleep(1) + except HttpResponseException as e: + # We convert to SynapseError as we know that it was a SynapseError + # on the master process that we should send to the client. (And + # importantly, not stack traces everywhere) + raise e.to_synapse_error() + + defer.returnValue(result) + + return send_request + + def register(self, http_server): + """Called by the server to register this as a handler to the + appropriate path. + """ + + url_args = list(self.PATH_ARGS) + handler = self._handle_request + method = self.METHOD + + if self.CACHE: + handler = self._cached_handler + url_args.append("txn_id") + + args = "/".join("(?P<%s>[^/]+)" % (arg,) for arg in url_args) + pattern = re.compile("^/_synapse/replication/%s/%s$" % ( + self.NAME, + args + )) + + http_server.register_paths(method, [pattern], handler) + + def _cached_handler(self, request, txn_id, **kwargs): + """Called on new incoming requests when caching is enabled. Checks + if there is a cached response for the request and returns that, + otherwise calls `_handle_request` and caches its response. + """ + # We just use the txn_id here, but we probably also want to use the + # other PATH_ARGS as well. + + assert self.CACHE + + return self.response_cache.wrap( + txn_id, + self._handle_request, + request, **kwargs + ) diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py index 7a3cfb159c..e58bebf12a 100644 --- a/synapse/replication/http/membership.py +++ b/synapse/replication/http/membership.py @@ -14,182 +14,63 @@ # limitations under the License. import logging -import re from twisted.internet import defer -from synapse.api.errors import HttpResponseException -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID from synapse.util.distributor import user_joined_room, user_left_room logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def remote_join(client, host, port, requester, remote_room_hosts, - room_id, user_id, content): - """Ask the master to do a remote join for the given user to the given room +class ReplicationRemoteJoinRestServlet(ReplicationEndpoint): + """Does a remote join for the given user to the given room - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - remote_room_hosts (list[str]): Servers to try and join via - room_id (str) - user_id (str) - content (dict): The event content to use for the join event + Request format: - Returns: - Deferred - """ - uri = "http://%s:%s/_synapse/replication/remote_join" % (host, port) - - payload = { - "requester": requester.serialize(), - "remote_room_hosts": remote_room_hosts, - "room_id": room_id, - "user_id": user_id, - "content": content, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except HttpResponseException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise e.to_synapse_error() - defer.returnValue(result) - - -@defer.inlineCallbacks -def remote_reject_invite(client, host, port, requester, remote_room_hosts, - room_id, user_id): - """Ask master to reject the invite for the user and room. - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - remote_room_hosts (list[str]): Servers to try and reject via - room_id (str) - user_id (str) - - Returns: - Deferred - """ - uri = "http://%s:%s/_synapse/replication/remote_reject_invite" % (host, port) - - payload = { - "requester": requester.serialize(), - "remote_room_hosts": remote_room_hosts, - "room_id": room_id, - "user_id": user_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except HttpResponseException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise e.to_synapse_error() - defer.returnValue(result) - - -@defer.inlineCallbacks -def get_or_register_3pid_guest(client, host, port, requester, - medium, address, inviter_user_id): - """Ask the master to get/create a guest account for given 3PID. - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - medium (str) - address (str) - inviter_user_id (str): The user ID who is trying to invite the - 3PID - - Returns: - Deferred[(str, str)]: A 2-tuple of `(user_id, access_token)` of the - 3PID guest account. - """ + POST /_synapse/replication/remote_join/:room_id/:user_id - uri = "http://%s:%s/_synapse/replication/get_or_register_3pid_guest" % (host, port) - - payload = { - "requester": requester.serialize(), - "medium": medium, - "address": address, - "inviter_user_id": inviter_user_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except HttpResponseException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise e.to_synapse_error() - defer.returnValue(result) - - -@defer.inlineCallbacks -def notify_user_membership_change(client, host, port, user_id, room_id, change): - """Notify master that a user has joined or left the room - - Args: - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication. - user_id (str) - room_id (str) - change (str): Either "join" or "left" - - Returns: - Deferred + { + "requester": ..., + "remote_room_hosts": [...], + "content": { ... } + } """ - assert change in ("joined", "left") - - uri = "http://%s:%s/_synapse/replication/user_%s_room" % (host, port, change) - - payload = { - "user_id": user_id, - "room_id": room_id, - } - - try: - result = yield client.post_json_get_json(uri, payload) - except HttpResponseException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise e.to_synapse_error() - defer.returnValue(result) - -class ReplicationRemoteJoinRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/remote_join$")] + NAME = "remote_join" + PATH_ARGS = ("room_id", "user_id",) def __init__(self, hs): - super(ReplicationRemoteJoinRestServlet, self).__init__() + super(ReplicationRemoteJoinRestServlet, self).__init__(hs) self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, room_id, user_id, remote_room_hosts, + content): + """ + Args: + requester(Requester) + room_id (str) + user_id (str) + remote_room_hosts (list[str]): Servers to try and join via + content(dict): The event content to use for the join event + """ + return { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + "content": content, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] - room_id = content["room_id"] - user_id = content["user_id"] event_content = content["content"] requester = Requester.deserialize(self.store, content["requester"]) @@ -212,23 +93,48 @@ class ReplicationRemoteJoinRestServlet(RestServlet): defer.returnValue((200, {})) -class ReplicationRemoteRejectInviteRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/remote_reject_invite$")] +class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint): + """Rejects the invite for the user and room. + + Request format: + + POST /_synapse/replication/remote_reject_invite/:room_id/:user_id + + { + "requester": ..., + "remote_room_hosts": [...], + } + """ + + NAME = "remote_reject_invite" + PATH_ARGS = ("room_id", "user_id",) def __init__(self, hs): - super(ReplicationRemoteRejectInviteRestServlet, self).__init__() + super(ReplicationRemoteRejectInviteRestServlet, self).__init__(hs) self.federation_handler = hs.get_handlers().federation_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, room_id, user_id, remote_room_hosts): + """ + Args: + requester(Requester) + room_id (str) + user_id (str) + remote_room_hosts (list[str]): Servers to try and reject via + """ + return { + "requester": requester.serialize(), + "remote_room_hosts": remote_room_hosts, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request, room_id, user_id): content = parse_json_object_from_request(request) remote_room_hosts = content["remote_room_hosts"] - room_id = content["room_id"] - user_id = content["user_id"] requester = Requester.deserialize(self.store, content["requester"]) @@ -264,18 +170,50 @@ class ReplicationRemoteRejectInviteRestServlet(RestServlet): defer.returnValue((200, ret)) -class ReplicationRegister3PIDGuestRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/get_or_register_3pid_guest$")] +class ReplicationRegister3PIDGuestRestServlet(ReplicationEndpoint): + """Gets/creates a guest account for given 3PID. + + Request format: + + POST /_synapse/replication/get_or_register_3pid_guest/ + + { + "requester": ..., + "medium": ..., + "address": ..., + "inviter_user_id": ... + } + """ + + NAME = "get_or_register_3pid_guest" + PATH_ARGS = () def __init__(self, hs): - super(ReplicationRegister3PIDGuestRestServlet, self).__init__() + super(ReplicationRegister3PIDGuestRestServlet, self).__init__(hs) self.registeration_handler = hs.get_handlers().registration_handler self.store = hs.get_datastore() self.clock = hs.get_clock() + @staticmethod + def _serialize_payload(requester, medium, address, inviter_user_id): + """ + Args: + requester(Requester) + medium (str) + address (str) + inviter_user_id (str): The user ID who is trying to invite the + 3PID + """ + return { + "requester": requester.serialize(), + "medium": medium, + "address": address, + "inviter_user_id": inviter_user_id, + } + @defer.inlineCallbacks - def on_POST(self, request): + def _handle_request(self, request): content = parse_json_object_from_request(request) medium = content["medium"] @@ -296,23 +234,41 @@ class ReplicationRegister3PIDGuestRestServlet(RestServlet): defer.returnValue((200, ret)) -class ReplicationUserJoinedLeftRoomRestServlet(RestServlet): - PATTERNS = [re.compile("^/_synapse/replication/user_(?P<change>joined|left)_room$")] +class ReplicationUserJoinedLeftRoomRestServlet(ReplicationEndpoint): + """Notifies that a user has joined or left the room + + Request format: + + POST /_synapse/replication/membership_change/:room_id/:user_id/:change + + {} + """ + + NAME = "membership_change" + PATH_ARGS = ("room_id", "user_id", "change") + CACHE = False # No point caching as should return instantly. def __init__(self, hs): - super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__() + super(ReplicationUserJoinedLeftRoomRestServlet, self).__init__(hs) self.registeration_handler = hs.get_handlers().registration_handler self.store = hs.get_datastore() self.clock = hs.get_clock() self.distributor = hs.get_distributor() - def on_POST(self, request, change): - content = parse_json_object_from_request(request) + @staticmethod + def _serialize_payload(room_id, user_id, change): + """ + Args: + room_id (str) + user_id (str) + change (str): Either "joined" or "left" + """ + assert change in ("joined", "left",) - user_id = content["user_id"] - room_id = content["room_id"] + return {} + def _handle_request(self, request, room_id, user_id, change): logger.info("user membership change: %s in %s", user_id, room_id) user = UserID.from_string(user_id) diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py index d3509dc288..5b52c91650 100644 --- a/synapse/replication/http/send_event.py +++ b/synapse/replication/http/send_event.py @@ -14,86 +14,26 @@ # limitations under the License. import logging -import re from twisted.internet import defer -from synapse.api.errors import CodeMessageException, HttpResponseException from synapse.events import FrozenEvent from synapse.events.snapshot import EventContext -from synapse.http.servlet import RestServlet, parse_json_object_from_request +from synapse.http.servlet import parse_json_object_from_request +from synapse.replication.http._base import ReplicationEndpoint from synapse.types import Requester, UserID -from synapse.util.caches.response_cache import ResponseCache from synapse.util.metrics import Measure logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def send_event_to_master(clock, store, client, host, port, requester, event, context, - ratelimit, extra_users): - """Send event to be handled on the master - - Args: - clock (synapse.util.Clock) - store (DataStore) - client (SimpleHttpClient) - host (str): host of master - port (int): port on master listening for HTTP replication - requester (Requester) - event (FrozenEvent) - context (EventContext) - ratelimit (bool) - extra_users (list(UserID)): Any extra users to notify about event - """ - uri = "http://%s:%s/_synapse/replication/send_event/%s" % ( - host, port, event.event_id, - ) - - serialized_context = yield context.serialize(event, store) - - payload = { - "event": event.get_pdu_json(), - "internal_metadata": event.internal_metadata.get_dict(), - "rejected_reason": event.rejected_reason, - "context": serialized_context, - "requester": requester.serialize(), - "ratelimit": ratelimit, - "extra_users": [u.to_string() for u in extra_users], - } - - try: - # We keep retrying the same request for timeouts. This is so that we - # have a good idea that the request has either succeeded or failed on - # the master, and so whether we should clean up or not. - while True: - try: - result = yield client.put_json(uri, payload) - break - except CodeMessageException as e: - if e.code != 504: - raise - - logger.warn("send_event request timed out") - - # If we timed out we probably don't need to worry about backing - # off too much, but lets just wait a little anyway. - yield clock.sleep(1) - except HttpResponseException as e: - # We convert to SynapseError as we know that it was a SynapseError - # on the master process that we should send to the client. (And - # importantly, not stack traces everywhere) - raise e.to_synapse_error() - defer.returnValue(result) - - -class ReplicationSendEventRestServlet(RestServlet): +class ReplicationSendEventRestServlet(ReplicationEndpoint): """Handles events newly created on workers, including persisting and notifying. The API looks like: - POST /_synapse/replication/send_event/:event_id + POST /_synapse/replication/send_event/:event_id/:txn_id { "event": { .. serialized event .. }, @@ -105,27 +45,47 @@ class ReplicationSendEventRestServlet(RestServlet): "extra_users": [], } """ - PATTERNS = [re.compile("^/_synapse/replication/send_event/(?P<event_id>[^/]+)$")] + NAME = "send_event" + PATH_ARGS = ("event_id",) def __init__(self, hs): - super(ReplicationSendEventRestServlet, self).__init__() + super(ReplicationSendEventRestServlet, self).__init__(hs) self.event_creation_handler = hs.get_event_creation_handler() self.store = hs.get_datastore() self.clock = hs.get_clock() - # The responses are tiny, so we may as well cache them for a while - self.response_cache = ResponseCache(hs, "send_event", timeout_ms=30 * 60 * 1000) + @staticmethod + @defer.inlineCallbacks + def _serialize_payload(event_id, store, event, context, requester, + ratelimit, extra_users): + """ + Args: + event_id (str) + store (DataStore) + requester (Requester) + event (FrozenEvent) + context (EventContext) + ratelimit (bool) + extra_users (list(UserID)): Any extra users to notify about event + """ + + serialized_context = yield context.serialize(event, store) + + payload = { + "event": event.get_pdu_json(), + "internal_metadata": event.internal_metadata.get_dict(), + "rejected_reason": event.rejected_reason, + "context": serialized_context, + "requester": requester.serialize(), + "ratelimit": ratelimit, + "extra_users": [u.to_string() for u in extra_users], + } - def on_PUT(self, request, event_id): - return self.response_cache.wrap( - event_id, - self._handle_request, - request - ) + defer.returnValue(payload) @defer.inlineCallbacks - def _handle_request(self, request): + def _handle_request(self, request, event_id): with Measure(self.clock, "repl_send_event_parse"): content = parse_json_object_from_request(request) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 2c263af1a3..f422cf3c5a 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -48,7 +48,9 @@ def _expect_edu(destination, edu_type, content, origin="test"): def _make_edu_json(origin, edu_type, content): - return json.dumps(_expect_edu("test", edu_type, content, origin=origin)) + return json.dumps( + _expect_edu("test", edu_type, content, origin=origin) + ).encode('utf8') class TypingNotificationsTestCase(unittest.TestCase): diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index 34e68ae82f..d46c27e7e9 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -85,7 +85,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase): try: yield self.cache.fetch_or_execute(self.mock_key, cb) except Exception as e: - self.assertEqual(e.message, "boo") + self.assertEqual(e.args[0], "boo") self.assertIs(LoggingContext.current_context(), test_context) res = yield self.cache.fetch_or_execute(self.mock_key, cb) @@ -111,7 +111,7 @@ class HttpTransactionCacheTestCase(unittest.TestCase): try: yield self.cache.fetch_or_execute(self.mock_key, cb) except Exception as e: - self.assertEqual(e.message, "boo") + self.assertEqual(e.args[0], "boo") self.assertIs(LoggingContext.current_context(), test_context) res = yield self.cache.fetch_or_execute(self.mock_key, cb) diff --git a/tests/rest/client/v1/test_admin.py b/tests/rest/client/v1/test_admin.py index 8c90145601..fb28883d30 100644 --- a/tests/rest/client/v1/test_admin.py +++ b/tests/rest/client/v1/test_admin.py @@ -140,7 +140,7 @@ class UserRegisterTestCase(unittest.TestCase): "admin": True, "mac": want_mac, } - ).encode('utf8') + ) request, channel = make_request("POST", self.url, body.encode('utf8')) render(request, self.resource, self.clock) @@ -168,7 +168,7 @@ class UserRegisterTestCase(unittest.TestCase): "admin": True, "mac": want_mac, } - ).encode('utf8') + ) request, channel = make_request("POST", self.url, body.encode('utf8')) render(request, self.resource, self.clock) @@ -195,7 +195,7 @@ class UserRegisterTestCase(unittest.TestCase): "admin": True, "mac": want_mac, } - ).encode('utf8') + ) request, channel = make_request("POST", self.url, body.encode('utf8')) render(request, self.resource, self.clock) @@ -253,7 +253,7 @@ class UserRegisterTestCase(unittest.TestCase): self.assertEqual('Invalid username', channel.json_body["error"]) # Must not have null bytes - body = json.dumps({"nonce": nonce(), "username": b"abcd\x00"}) + body = json.dumps({"nonce": nonce(), "username": u"abcd\u0000"}) request, channel = make_request("POST", self.url, body.encode('utf8')) render(request, self.resource, self.clock) @@ -289,7 +289,7 @@ class UserRegisterTestCase(unittest.TestCase): self.assertEqual('Invalid password', channel.json_body["error"]) # Must not have null bytes - body = json.dumps({"nonce": nonce(), "username": "a", "password": b"abcd\x00"}) + body = json.dumps({"nonce": nonce(), "username": "a", "password": u"abcd\u0000"}) request, channel = make_request("POST", self.url, body.encode('utf8')) render(request, self.resource, self.clock) diff --git a/tests/rest/client/v1/test_profile.py b/tests/rest/client/v1/test_profile.py index d71cc8e0db..0516ce3cfb 100644 --- a/tests/rest/client/v1/test_profile.py +++ b/tests/rest/client/v1/test_profile.py @@ -80,7 +80,7 @@ class ProfileTestCase(unittest.TestCase): (code, response) = yield self.mock_resource.trigger( "PUT", "/profile/%s/displayname" % (myid), - '{"displayname": "Frank Jr."}' + b'{"displayname": "Frank Jr."}' ) self.assertEquals(200, code) @@ -95,7 +95,7 @@ class ProfileTestCase(unittest.TestCase): (code, response) = yield self.mock_resource.trigger( "PUT", "/profile/%s/displayname" % ("@4567:test"), - '{"displayname": "Frank Jr."}' + b'{"displayname": "Frank Jr."}' ) self.assertTrue( @@ -122,7 +122,7 @@ class ProfileTestCase(unittest.TestCase): (code, response) = yield self.mock_resource.trigger( "PUT", "/profile/%s/displayname" % ("@opaque:elsewhere"), - '{"displayname":"bob"}' + b'{"displayname":"bob"}' ) self.assertTrue( @@ -151,7 +151,7 @@ class ProfileTestCase(unittest.TestCase): (code, response) = yield self.mock_resource.trigger( "PUT", "/profile/%s/avatar_url" % (myid), - '{"avatar_url": "http://my.server/pic.gif"}' + b'{"avatar_url": "http://my.server/pic.gif"}' ) self.assertEquals(200, code) diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py index 41de8e0762..e3bc5f378d 100644 --- a/tests/rest/client/v1/utils.py +++ b/tests/rest/client/v1/utils.py @@ -105,7 +105,7 @@ class RestTestCase(unittest.TestCase): "password": "test", "type": "m.login.password" })) - self.assertEquals(200, code) + self.assertEquals(200, code, msg=response) defer.returnValue(response) @defer.inlineCallbacks @@ -149,14 +149,14 @@ class RestHelper(object): def create_room_as(self, room_creator, is_public=True, tok=None): temp_id = self.auth_user_id self.auth_user_id = room_creator - path = b"/_matrix/client/r0/createRoom" + path = "/_matrix/client/r0/createRoom" content = {} if not is_public: content["visibility"] = "private" if tok: - path = path + b"?access_token=%s" % tok.encode('ascii') + path = path + "?access_token=%s" % tok - request, channel = make_request(b"POST", path, json.dumps(content).encode('utf8')) + request, channel = make_request("POST", path, json.dumps(content).encode('utf8')) request.render(self.resource) wait_until_result(self.hs.get_reactor(), channel) @@ -205,7 +205,7 @@ class RestHelper(object): data = {"membership": membership} request, channel = make_request( - b"PUT", path.encode('ascii'), json.dumps(data).encode('utf8') + "PUT", path, json.dumps(data).encode('utf8') ) request.render(self.resource) diff --git a/tests/rest/client/v2_alpha/test_filter.py b/tests/rest/client/v2_alpha/test_filter.py index e890f0feac..de33b10a5f 100644 --- a/tests/rest/client/v2_alpha/test_filter.py +++ b/tests/rest/client/v2_alpha/test_filter.py @@ -33,7 +33,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha" class FilterTestCase(unittest.TestCase): - USER_ID = b"@apple:test" + USER_ID = "@apple:test" EXAMPLE_FILTER = {"room": {"timeline": {"types": ["m.room.message"]}}} EXAMPLE_FILTER_JSON = b'{"room": {"timeline": {"types": ["m.room.message"]}}}' TO_REGISTER = [filter] @@ -72,8 +72,8 @@ class FilterTestCase(unittest.TestCase): def test_add_filter(self): request, channel = make_request( - b"POST", - b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID), + "POST", + "/_matrix/client/r0/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON, ) request.render(self.resource) @@ -87,8 +87,8 @@ class FilterTestCase(unittest.TestCase): def test_add_filter_for_other_user(self): request, channel = make_request( - b"POST", - b"/_matrix/client/r0/user/%s/filter" % (b"@watermelon:test"), + "POST", + "/_matrix/client/r0/user/%s/filter" % ("@watermelon:test"), self.EXAMPLE_FILTER_JSON, ) request.render(self.resource) @@ -101,8 +101,8 @@ class FilterTestCase(unittest.TestCase): _is_mine = self.hs.is_mine self.hs.is_mine = lambda target_user: False request, channel = make_request( - b"POST", - b"/_matrix/client/r0/user/%s/filter" % (self.USER_ID), + "POST", + "/_matrix/client/r0/user/%s/filter" % (self.USER_ID), self.EXAMPLE_FILTER_JSON, ) request.render(self.resource) @@ -119,7 +119,7 @@ class FilterTestCase(unittest.TestCase): self.clock.advance(1) filter_id = filter_id.result request, channel = make_request( - b"GET", b"/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id) + "GET", "/_matrix/client/r0/user/%s/filter/%s" % (self.USER_ID, filter_id) ) request.render(self.resource) wait_until_result(self.clock, channel) @@ -129,7 +129,7 @@ class FilterTestCase(unittest.TestCase): def test_get_filter_non_existant(self): request, channel = make_request( - b"GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID) + "GET", "/_matrix/client/r0/user/%s/filter/12382148321" % (self.USER_ID) ) request.render(self.resource) wait_until_result(self.clock, channel) @@ -141,7 +141,7 @@ class FilterTestCase(unittest.TestCase): # in errors.py def test_get_filter_invalid_id(self): request, channel = make_request( - b"GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID) + "GET", "/_matrix/client/r0/user/%s/filter/foobar" % (self.USER_ID) ) request.render(self.resource) wait_until_result(self.clock, channel) @@ -151,7 +151,7 @@ class FilterTestCase(unittest.TestCase): # No ID also returns an invalid_id error def test_get_filter_no_id(self): request, channel = make_request( - b"GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID) + "GET", "/_matrix/client/r0/user/%s/filter/" % (self.USER_ID) ) request.render(self.resource) wait_until_result(self.clock, channel) diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index e004d8fc73..f6293f11a8 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -81,7 +81,7 @@ class RegisterRestServletTestCase(unittest.TestCase): "access_token": token, "home_server": self.hs.hostname, } - self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) + self.assertDictContainsSubset(det_data, channel.json_body) def test_POST_appservice_registration_invalid(self): self.appservice = None # no application service exists @@ -102,7 +102,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals( - json.loads(channel.result["body"])["error"], "Invalid password" + channel.json_body["error"], "Invalid password" ) def test_POST_bad_username(self): @@ -113,7 +113,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.assertEquals(channel.result["code"], b"400", channel.result) self.assertEquals( - json.loads(channel.result["body"])["error"], "Invalid username" + channel.json_body["error"], "Invalid username" ) def test_POST_user_valid(self): @@ -140,7 +140,7 @@ class RegisterRestServletTestCase(unittest.TestCase): "device_id": device_id, } self.assertEquals(channel.result["code"], b"200", channel.result) - self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) + self.assertDictContainsSubset(det_data, channel.json_body) self.auth_handler.get_login_tuple_for_user_id( user_id, device_id=device_id, initial_device_display_name=None ) @@ -158,7 +158,7 @@ class RegisterRestServletTestCase(unittest.TestCase): self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals( - json.loads(channel.result["body"])["error"], + channel.json_body["error"], "Registration has been disabled", ) @@ -178,7 +178,7 @@ class RegisterRestServletTestCase(unittest.TestCase): "device_id": "guest_device", } self.assertEquals(channel.result["code"], b"200", channel.result) - self.assertDictContainsSubset(det_data, json.loads(channel.result["body"])) + self.assertDictContainsSubset(det_data, channel.json_body) def test_POST_disabled_guest_registration(self): self.hs.config.allow_guest_access = False @@ -189,5 +189,5 @@ class RegisterRestServletTestCase(unittest.TestCase): self.assertEquals(channel.result["code"], b"403", channel.result) self.assertEquals( - json.loads(channel.result["body"])["error"], "Guest access is disabled" + channel.json_body["error"], "Guest access is disabled" ) diff --git a/tests/rest/client/v2_alpha/test_sync.py b/tests/rest/client/v2_alpha/test_sync.py index 03ec3993b2..bafc0d1df0 100644 --- a/tests/rest/client/v2_alpha/test_sync.py +++ b/tests/rest/client/v2_alpha/test_sync.py @@ -32,7 +32,7 @@ PATH_PREFIX = "/_matrix/client/v2_alpha" class FilterTestCase(unittest.TestCase): - USER_ID = b"@apple:test" + USER_ID = "@apple:test" TO_REGISTER = [sync] def setUp(self): @@ -68,7 +68,7 @@ class FilterTestCase(unittest.TestCase): r.register_servlets(self.hs, self.resource) def test_sync_argless(self): - request, channel = make_request(b"GET", b"/_matrix/client/r0/sync") + request, channel = make_request("GET", "/_matrix/client/r0/sync") request.render(self.resource) wait_until_result(self.clock, channel) diff --git a/tests/server.py b/tests/server.py index c611dd6059..e249668d21 100644 --- a/tests/server.py +++ b/tests/server.py @@ -11,6 +11,7 @@ from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactorClock from synapse.http.site import SynapseRequest +from synapse.util import Clock from tests.utils import setup_test_homeserver as _sth @@ -28,7 +29,13 @@ class FakeChannel(object): def json_body(self): if not self.result: raise Exception("No result yet.") - return json.loads(self.result["body"]) + return json.loads(self.result["body"].decode('utf8')) + + @property + def code(self): + if not self.result: + raise Exception("No result yet.") + return int(self.result["code"]) def writeHeaders(self, version, code, reason, headers): self.result["version"] = version @@ -79,11 +86,16 @@ def make_request(method, path, content=b""): Make a web request using the given method and path, feed it the content, and return the Request and the Channel underneath. """ + if not isinstance(method, bytes): + method = method.encode('ascii') + + if not isinstance(path, bytes): + path = path.encode('ascii') # Decorate it to be the full path if not path.startswith(b"/_matrix"): path = b"/_matrix/client/r0/" + path - path = path.replace("//", "/") + path = path.replace(b"//", b"/") if isinstance(content, text_type): content = content.encode('utf8') @@ -191,3 +203,9 @@ def setup_test_homeserver(*args, **kwargs): clock.threadpool = ThreadPool() pool.threadpool = ThreadPool() return d + + +def get_clock(): + clock = ThreadedMemoryReactorClock() + hs_clock = Clock(clock) + return (clock, hs_clock) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 30683e7888..69412c5aad 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -49,7 +49,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.TestCase): 'INSERT INTO event_reference_hashes ' '(event_id, algorithm, hash) ' "VALUES (?, 'sha256', ?)" - ), (event_id, 'ffff')) + ), (event_id, b'ffff')) for i in range(0, 11): yield self.store.runInteraction("insert", insert_event, i) diff --git a/tests/storage/test_state.py b/tests/storage/test_state.py index 7a76d67b8c..f7871cd426 100644 --- a/tests/storage/test_state.py +++ b/tests/storage/test_state.py @@ -176,7 +176,7 @@ class StateStoreTestCase(tests.unittest.TestCase): room_id = self.room.to_string() group_ids = yield self.store.get_state_groups_ids(room_id, [e5.event_id]) - group = group_ids.keys()[0] + group = list(group_ids.keys())[0] # test _get_some_state_from_cache correctly filters out members with types=[] (state_dict, is_all) = yield self.store._get_some_state_from_cache( diff --git a/tests/test_server.py b/tests/test_server.py index 7e063c0290..fc396226ea 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -1,4 +1,3 @@ -import json import re from twisted.internet.defer import Deferred @@ -104,9 +103,8 @@ class JsonResourceTests(unittest.TestCase): request.render(res) self.assertEqual(channel.result["code"], b'403') - reply_body = json.loads(channel.result["body"]) - self.assertEqual(reply_body["error"], "Forbidden!!one!") - self.assertEqual(reply_body["errcode"], "M_FORBIDDEN") + self.assertEqual(channel.json_body["error"], "Forbidden!!one!") + self.assertEqual(channel.json_body["errcode"], "M_FORBIDDEN") def test_no_handler(self): """ @@ -126,6 +124,5 @@ class JsonResourceTests(unittest.TestCase): request.render(res) self.assertEqual(channel.result["code"], b'400') - reply_body = json.loads(channel.result["body"]) - self.assertEqual(reply_body["error"], "Unrecognized request") - self.assertEqual(reply_body["errcode"], "M_UNRECOGNIZED") + self.assertEqual(channel.json_body["error"], "Unrecognized request") + self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") diff --git a/tests/utils.py b/tests/utils.py index d378de7b41..3f17304934 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -155,8 +155,9 @@ def setup_test_homeserver(name="test", datastore=None, config=None, reactor=None # Need to let the HS build an auth handler and then mess with it # because AuthHandler's constructor requires the HS, so we can't make one # beforehand and pass it in to the HS's constructor (chicken / egg) - hs.get_auth_handler().hash = lambda p: hashlib.md5(p).hexdigest() - hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5(p).hexdigest() == h + hs.get_auth_handler().hash = lambda p: hashlib.md5(p.encode('utf8')).hexdigest() + hs.get_auth_handler().validate_hash = lambda p, h: hashlib.md5( + p.encode('utf8')).hexdigest() == h fed = kargs.get("resource_for_federation", None) if fed: @@ -229,8 +230,8 @@ class MockHttpResource(HttpServer): mock_content.configure_mock(**config) mock_request.content = mock_content - mock_request.method = http_method - mock_request.uri = path + mock_request.method = http_method.encode('ascii') + mock_request.uri = path.encode('ascii') mock_request.getClientIP.return_value = "-" |