From 4b965c862dc66c0da5d3240add70e9b5f0aa720b Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 14 Apr 2021 16:34:27 +0200 Subject: Remove redundant "coding: utf-8" lines (#9786) Part of #9744 Removes all redundant `# -*- coding: utf-8 -*-` lines from files, as python 3 automatically reads source code as utf-8 now. `Signed-off-by: Jonathan de Jong ` --- synapse/storage/databases/main/__init__.py | 1 - synapse/storage/databases/main/account_data.py | 1 - synapse/storage/databases/main/appservice.py | 1 - synapse/storage/databases/main/cache.py | 1 - synapse/storage/databases/main/censor_events.py | 1 - synapse/storage/databases/main/client_ips.py | 1 - synapse/storage/databases/main/deviceinbox.py | 1 - synapse/storage/databases/main/devices.py | 1 - synapse/storage/databases/main/directory.py | 1 - synapse/storage/databases/main/e2e_room_keys.py | 1 - synapse/storage/databases/main/end_to_end_keys.py | 1 - synapse/storage/databases/main/event_federation.py | 1 - synapse/storage/databases/main/event_push_actions.py | 1 - synapse/storage/databases/main/events.py | 1 - synapse/storage/databases/main/events_bg_updates.py | 1 - synapse/storage/databases/main/events_forward_extremities.py | 1 - synapse/storage/databases/main/events_worker.py | 1 - synapse/storage/databases/main/filtering.py | 1 - synapse/storage/databases/main/group_server.py | 1 - synapse/storage/databases/main/keys.py | 1 - synapse/storage/databases/main/media_repository.py | 1 - synapse/storage/databases/main/metrics.py | 1 - synapse/storage/databases/main/monthly_active_users.py | 1 - synapse/storage/databases/main/presence.py | 1 - synapse/storage/databases/main/profile.py | 1 - synapse/storage/databases/main/purge_events.py | 1 - synapse/storage/databases/main/push_rule.py | 1 - synapse/storage/databases/main/pusher.py | 1 - synapse/storage/databases/main/receipts.py | 1 - synapse/storage/databases/main/registration.py | 1 - synapse/storage/databases/main/rejections.py | 1 - synapse/storage/databases/main/relations.py | 1 - synapse/storage/databases/main/room.py | 1 - synapse/storage/databases/main/roommember.py | 1 - .../databases/main/schema/delta/50/make_event_content_nullable.py | 1 - .../storage/databases/main/schema/delta/57/local_current_membership.py | 1 - synapse/storage/databases/main/search.py | 1 - synapse/storage/databases/main/signatures.py | 1 - synapse/storage/databases/main/state.py | 1 - synapse/storage/databases/main/state_deltas.py | 1 - synapse/storage/databases/main/stats.py | 1 - synapse/storage/databases/main/stream.py | 1 - synapse/storage/databases/main/tags.py | 1 - synapse/storage/databases/main/transactions.py | 1 - synapse/storage/databases/main/ui_auth.py | 1 - synapse/storage/databases/main/user_directory.py | 1 - synapse/storage/databases/main/user_erasure_store.py | 1 - 47 files changed, 47 deletions(-) (limited to 'synapse/storage/databases/main') diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index b3d16ca7ac..5c50f5f950 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # Copyright 2019-2021 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index a277a1ef13..1d02795f43 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 85bb853d33..9f182c2a89 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 1e7637a6f5..ecc1f935e2 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index 3e26d5ba87..f22c1f241b 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/client_ips.py b/synapse/storage/databases/main/client_ips.py index ea3c15fd0e..d60010e942 100644 --- a/synapse/storage/databases/main/client_ips.py +++ b/synapse/storage/databases/main/client_ips.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 691080ce74..7c9d1f744e 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9bf8ba888f..b204875580 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/directory.py b/synapse/storage/databases/main/directory.py index 267b948397..86075bc55b 100644 --- a/synapse/storage/databases/main/directory.py +++ b/synapse/storage/databases/main/directory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/e2e_room_keys.py b/synapse/storage/databases/main/e2e_room_keys.py index 12cecceec2..b15fb71e62 100644 --- a/synapse/storage/databases/main/e2e_room_keys.py +++ b/synapse/storage/databases/main/e2e_room_keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 New Vector Ltd # Copyright 2019 Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/end_to_end_keys.py b/synapse/storage/databases/main/end_to_end_keys.py index f1e7859d26..88afe97c41 100644 --- a/synapse/storage/databases/main/end_to_end_keys.py +++ b/synapse/storage/databases/main/end_to_end_keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index a956be491a..32ce70a396 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 78245ad5bd..5845322118 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index ad17123915..bed4326d11 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018-2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py index 79e7df6ca9..cbe4be1437 100644 --- a/synapse/storage/databases/main/events_bg_updates.py +++ b/synapse/storage/databases/main/events_bg_updates.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/events_forward_extremities.py b/synapse/storage/databases/main/events_forward_extremities.py index b3703ae161..6d2688d711 100644 --- a/synapse/storage/databases/main/events_forward_extremities.py +++ b/synapse/storage/databases/main/events_forward_extremities.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index c00780969f..64d70785b8 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/filtering.py b/synapse/storage/databases/main/filtering.py index d2f5b9a502..bb244a03c0 100644 --- a/synapse/storage/databases/main/filtering.py +++ b/synapse/storage/databases/main/filtering.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/group_server.py b/synapse/storage/databases/main/group_server.py index bd7826f4e9..66ad363bfb 100644 --- a/synapse/storage/databases/main/group_server.py +++ b/synapse/storage/databases/main/group_server.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index d504323b03..0e86807834 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 New Vector Ltd. # diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py index b7820ac7ff..c584868188 100644 --- a/synapse/storage/databases/main/media_repository.py +++ b/synapse/storage/databases/main/media_repository.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020-2021 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 614a418a15..c3f551d377 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 757da3d55d..fe25638289 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index 0ff693a310..c207d917b1 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py index ba01d3108a..9b4e95e134 100644 --- a/synapse/storage/databases/main/profile.py +++ b/synapse/storage/databases/main/profile.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 41f4fe7f95..8f83748b5e 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 9e58dc0e6a..db52176337 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index c65558c280..b48fe086d4 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 43c852c96c..3647276acb 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 90a8f664ef..833214b7e0 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017-2018 New Vector Ltd # Copyright 2019,2020 The Matrix.org Foundation C.I.C. diff --git a/synapse/storage/databases/main/rejections.py b/synapse/storage/databases/main/rejections.py index 1e361aaa9a..167318b314 100644 --- a/synapse/storage/databases/main/rejections.py +++ b/synapse/storage/databases/main/rejections.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index 5cd61547f7..2bbf6d6a95 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2019 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 47fb12f3f6..5f38634f48 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index a9216ca9ae..ef5587f87a 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py index b1684a8441..acd6ad1e1f 100644 --- a/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py +++ b/synapse/storage/databases/main/schema/delta/50/make_event_content_nullable.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py index 44917f0a2e..66989222e6 100644 --- a/synapse/storage/databases/main/schema/delta/57/local_current_membership.py +++ b/synapse/storage/databases/main/schema/delta/57/local_current_membership.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index f5e7d9ef98..0276f30656 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/signatures.py b/synapse/storage/databases/main/signatures.py index c8c67953e4..ab2159c2d3 100644 --- a/synapse/storage/databases/main/signatures.py +++ b/synapse/storage/databases/main/signatures.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py index 93431efe00..1757064a68 100644 --- a/synapse/storage/databases/main/state.py +++ b/synapse/storage/databases/main/state.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2020 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/state_deltas.py b/synapse/storage/databases/main/state_deltas.py index 0dbb501f16..bff7d0404f 100644 --- a/synapse/storage/databases/main/state_deltas.py +++ b/synapse/storage/databases/main/state_deltas.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py index bce8946c21..ae9f880965 100644 --- a/synapse/storage/databases/main/stats.py +++ b/synapse/storage/databases/main/stats.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd # Copyright 2019 The Matrix.org Foundation C.I.C. # diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 91f8abb67d..db5ce4ea01 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2017 Vector Creations Ltd # Copyright 2018-2019 New Vector Ltd diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index 50067eabfc..1d62c6140f 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # Copyright 2018 New Vector Ltd # diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index b7072f1f5e..82335e7a9d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/ui_auth.py b/synapse/storage/databases/main/ui_auth.py index 5473ec1485..22c05cdde7 100644 --- a/synapse/storage/databases/main/ui_auth.py +++ b/synapse/storage/databases/main/ui_auth.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2020 Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 1026f321e5..7a082fdd21 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2017 Vector Creations Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/storage/databases/main/user_erasure_store.py b/synapse/storage/databases/main/user_erasure_store.py index f9575b1f1f..acf6b2fb64 100644 --- a/synapse/storage/databases/main/user_erasure_store.py +++ b/synapse/storage/databases/main/user_erasure_store.py @@ -1,4 +1,3 @@ -# -*- coding: utf-8 -*- # Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); -- cgit 1.5.1 From 05e8c70c059f8ebb066e029bc3aa3e0cefef1019 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Wed, 14 Apr 2021 18:19:02 +0200 Subject: Experimental Federation Speedup (#9702) This basically speeds up federation by "squeezing" each individual dual database call (to destinations and destination_rooms), which previously happened per every event, into one call for an entire batch (100 max). Signed-off-by: Jonathan de Jong --- changelog.d/9702.misc | 1 + contrib/experiments/test_messaging.py | 42 ++++--- synapse/federation/sender/__init__.py | 140 ++++++++++++--------- synapse/federation/sender/per_destination_queue.py | 15 ++- synapse/storage/databases/main/transactions.py | 28 ++--- 5 files changed, 129 insertions(+), 97 deletions(-) create mode 100644 changelog.d/9702.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9702.misc b/changelog.d/9702.misc new file mode 100644 index 0000000000..c6e63450a9 --- /dev/null +++ b/changelog.d/9702.misc @@ -0,0 +1 @@ +Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan. diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py index 31b8a68225..5dd172052b 100644 --- a/contrib/experiments/test_messaging.py +++ b/contrib/experiments/test_messaging.py @@ -224,14 +224,16 @@ class HomeServer(ReplicationHandler): destinations = yield self.get_servers_for_context(room_name) try: - yield self.replication_layer.send_pdu( - Pdu.create_new( - context=room_name, - pdu_type="sy.room.message", - content={"sender": sender, "body": body}, - origin=self.server_name, - destinations=destinations, - ) + yield self.replication_layer.send_pdus( + [ + Pdu.create_new( + context=room_name, + pdu_type="sy.room.message", + content={"sender": sender, "body": body}, + origin=self.server_name, + destinations=destinations, + ) + ] ) except Exception as e: logger.exception(e) @@ -253,7 +255,7 @@ class HomeServer(ReplicationHandler): origin=self.server_name, destinations=destinations, ) - yield self.replication_layer.send_pdu(pdu) + yield self.replication_layer.send_pdus([pdu]) except Exception as e: logger.exception(e) @@ -265,16 +267,18 @@ class HomeServer(ReplicationHandler): destinations = yield self.get_servers_for_context(room_name) try: - yield self.replication_layer.send_pdu( - Pdu.create_new( - context=room_name, - is_state=True, - pdu_type="sy.room.member", - state_key=invitee, - content={"membership": "invite"}, - origin=self.server_name, - destinations=destinations, - ) + yield self.replication_layer.send_pdus( + [ + Pdu.create_new( + context=room_name, + is_state=True, + pdu_type="sy.room.member", + state_key=invitee, + content={"membership": "invite"}, + origin=self.server_name, + destinations=destinations, + ) + ] ) except Exception as e: logger.exception(e) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 155161685d..952ad39f8c 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -18,8 +18,6 @@ from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, from prometheus_client import Counter -from twisted.internet import defer - import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase @@ -27,11 +25,7 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.handlers.presence import get_interested_remotes -from synapse.logging.context import ( - make_deferred_yieldable, - preserve_fn, - run_in_background, -) +from synapse.logging.context import preserve_fn from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -39,7 +33,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure, measure_func if TYPE_CHECKING: @@ -276,15 +270,27 @@ class FederationSender(AbstractFederationSender): if not events and next_token >= self._last_poked_id: break - async def handle_event(event: EventBase) -> None: + async def get_destinations_for_event( + event: EventBase, + ) -> Collection[str]: + """Computes the destinations to which this event must be sent. + + This returns an empty tuple when there are no destinations to send to, + or if this event is not from this homeserver and it is not sending + it on behalf of another server. + + Will also filter out destinations which this sender is not responsible for, + if multiple federation senders exist. + """ + # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) if not is_mine and send_on_behalf_of is None: - return + return () if not event.internal_metadata.should_proactively_send(): - return + return () destinations = None # type: Optional[Set[str]] if not event.prev_event_ids(): @@ -319,7 +325,7 @@ class FederationSender(AbstractFederationSender): "Failed to calculate hosts in room for event: %s", event.event_id, ) - return + return () destinations = { d @@ -329,17 +335,15 @@ class FederationSender(AbstractFederationSender): ) } + destinations.discard(self.server_name) + if send_on_behalf_of is not None: # If we are sending the event on behalf of another server # then it already has the event and there is no reason to # send the event to it. destinations.discard(send_on_behalf_of) - logger.debug("Sending %s to %r", event, destinations) - if destinations: - await self._send_pdu(event, destinations) - now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -347,24 +351,29 @@ class FederationSender(AbstractFederationSender): "federation_sender" ).observe((now - ts) / 1000) - async def handle_room_events(events: Iterable[EventBase]) -> None: - with Measure(self.clock, "handle_room_events"): - for event in events: - await handle_event(event) - - events_by_room = {} # type: Dict[str, List[EventBase]] - for event in events: - events_by_room.setdefault(event.room_id, []).append(event) - - await make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background(handle_room_events, evs) - for evs in events_by_room.values() - ], - consumeErrors=True, - ) - ) + return destinations + return () + + async def get_federatable_events_and_destinations( + events: Iterable[EventBase], + ) -> List[Tuple[EventBase, Collection[str]]]: + with Measure(self.clock, "get_destinations_for_events"): + # Fetch federation destinations per event, + # skip if get_destinations_for_event returns an empty collection, + # return list of event->destinations pairs. + return [ + (event, dests) + for (event, dests) in [ + (event, await get_destinations_for_event(event)) + for event in events + ] + if dests + ] + + events_and_dests = await get_federatable_events_and_destinations(events) + + # Send corresponding events to each destination queue + await self._distribute_events(events_and_dests) await self.store.update_federation_out_pos("events", next_token) @@ -382,7 +391,7 @@ class FederationSender(AbstractFederationSender): events_processed_counter.inc(len(events)) event_processing_loop_room_count.labels("federation_sender").inc( - len(events_by_room) + len({event.room_id for event in events}) ) event_processing_loop_counter.labels("federation_sender").inc() @@ -394,34 +403,53 @@ class FederationSender(AbstractFederationSender): finally: self._is_processing = False - async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: - # We loop through all destinations to see whether we already have - # a transaction in progress. If we do, stick it in the pending_pdus - # table and we'll get back to it later. + async def _distribute_events( + self, + events_and_dests: Iterable[Tuple[EventBase, Collection[str]]], + ) -> None: + """Distribute events to the respective per_destination queues. - destinations = set(destinations) - destinations.discard(self.server_name) - logger.debug("Sending to: %s", str(destinations)) + Also persists last-seen per-room stream_ordering to 'destination_rooms'. - if not destinations: - return + Args: + events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]). + Every event is paired with its intended destinations (in federation). + """ + # Tuples of room_id + destination to their max-seen stream_ordering + room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int] - sent_pdus_destination_dist_total.inc(len(destinations)) - sent_pdus_destination_dist_count.inc() + # List of events to send to each destination + events_by_dest = {} # type: Dict[str, List[EventBase]] - assert pdu.internal_metadata.stream_ordering + # For each event-destinations pair... + for event, destinations in events_and_dests: - # track the fact that we have a PDU for these destinations, - # to allow us to perform catch-up later on if the remote is unreachable - # for a while. - await self.store.store_destination_rooms_entries( - destinations, - pdu.room_id, - pdu.internal_metadata.stream_ordering, + # (we got this from the database, it's filled) + assert event.internal_metadata.stream_ordering + + sent_pdus_destination_dist_total.inc(len(destinations)) + sent_pdus_destination_dist_count.inc() + + # ...iterate over those destinations.. + for destination in destinations: + # ...update their stream-ordering... + room_with_dest_stream_ordering[(event.room_id, destination)] = max( + event.internal_metadata.stream_ordering, + room_with_dest_stream_ordering.get((event.room_id, destination), 0), + ) + + # ...and add the event to each destination queue. + events_by_dest.setdefault(destination, []).append(event) + + # Bulk-store destination_rooms stream_ids + await self.store.bulk_store_destination_rooms_entries( + room_with_dest_stream_ordering ) - for destination in destinations: - self._get_per_destination_queue(destination).send_pdu(pdu) + for destination, pdus in events_by_dest.items(): + logger.debug("Sending %d pdus to %s", len(pdus), destination) + + self._get_per_destination_queue(destination).send_pdus(pdus) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3b053ebcfb..3bb66bce32 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -154,19 +154,22 @@ class PerDestinationQueue: + len(self._pending_edus_keyed) ) - def send_pdu(self, pdu: EventBase) -> None: - """Add a PDU to the queue, and start the transmission loop if necessary + def send_pdus(self, pdus: Iterable[EventBase]) -> None: + """Add PDUs to the queue, and start the transmission loop if necessary Args: - pdu: pdu to send + pdus: pdus to send """ if not self._catching_up or self._last_successful_stream_ordering is None: # only enqueue the PDU if we are not catching up (False) or do not # yet know if we have anything to catch up (None) - self._pending_pdus.append(pdu) + self._pending_pdus.extend(pdus) else: - assert pdu.internal_metadata.stream_ordering - self._catchup_last_skipped = pdu.internal_metadata.stream_ordering + self._catchup_last_skipped = max( + pdu.internal_metadata.stream_ordering + for pdu in pdus + if pdu.internal_metadata.stream_ordering is not None + ) self.attempt_new_transaction() diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 82335e7a9d..b28ca61f80 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from collections import namedtuple -from typing import Iterable, List, Optional, Tuple +from typing import Dict, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -295,37 +295,33 @@ class TransactionStore(TransactionWorkerStore): }, ) - async def store_destination_rooms_entries( - self, - destinations: Iterable[str], - room_id: str, - stream_ordering: int, - ) -> None: + async def bulk_store_destination_rooms_entries( + self, room_and_destination_to_ordering: Dict[Tuple[str, str], int] + ): """ - Updates or creates `destination_rooms` entries in batch for a single event. + Updates or creates `destination_rooms` entries for a number of events. Args: - destinations: list of destinations - room_id: the room_id of the event - stream_ordering: the stream_ordering of the event + room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id """ await self.db_pool.simple_upsert_many( table="destinations", key_names=("destination",), - key_values=[(d,) for d in destinations], + key_values={(d,) for _, d in room_and_destination_to_ordering.keys()}, value_names=[], value_values=[], desc="store_destination_rooms_entries_dests", ) - rows = [(destination, room_id) for destination in destinations] await self.db_pool.simple_upsert_many( table="destination_rooms", - key_names=("destination", "room_id"), - key_values=rows, + key_names=("room_id", "destination"), + key_values=list(room_and_destination_to_ordering.keys()), value_names=["stream_ordering"], - value_values=[(stream_ordering,)] * len(rows), + value_values=[ + (stream_id,) for stream_id in room_and_destination_to_ordering.values() + ], desc="store_destination_rooms_entries_rooms", ) -- cgit 1.5.1 From 601b893352838c1391da083e8edde62904d23208 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 16 Apr 2021 14:44:55 +0100 Subject: Small speed up joining large remote rooms (#9825) There are a couple of points in `persist_events` where we are doing a query per event in series, which we can replace. --- changelog.d/9825.misc | 1 + synapse/storage/databases/main/events.py | 54 +++++++++++++++++++------------- 2 files changed, 34 insertions(+), 21 deletions(-) create mode 100644 changelog.d/9825.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9825.misc b/changelog.d/9825.misc new file mode 100644 index 0000000000..42f3f15619 --- /dev/null +++ b/changelog.d/9825.misc @@ -0,0 +1 @@ +Small speed up for joining large remote rooms. diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bed4326d11..a362521e20 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1378,17 +1378,21 @@ class PersistEventsStore: ], ) - for event, _ in events_and_contexts: - if not event.internal_metadata.is_redacted(): - # If we're persisting an unredacted event we go and ensure - # that we mark any redactions that reference this event as - # requiring censoring. - self.db_pool.simple_update_txn( - txn, - table="redactions", - keyvalues={"redacts": event.event_id}, - updatevalues={"have_censored": False}, + # If we're persisting an unredacted event we go and ensure + # that we mark any redactions that reference this event as + # requiring censoring. + sql = "UPDATE redactions SET have_censored = ? WHERE redacts = ?" + txn.execute_batch( + sql, + ( + ( + False, + event.event_id, ) + for event, _ in events_and_contexts + if not event.internal_metadata.is_redacted() + ), + ) state_events_and_contexts = [ ec for ec in events_and_contexts if ec[0].is_state() @@ -1881,20 +1885,28 @@ class PersistEventsStore: ), ) - for event, _ in events_and_contexts: - user_ids = self.db_pool.simple_select_onecol_txn( - txn, - table="event_push_actions_staging", - keyvalues={"event_id": event.event_id}, - retcol="user_id", - ) + room_to_event_ids = {} # type: Dict[str, List[str]] + for e, _ in events_and_contexts: + room_to_event_ids.setdefault(e.room_id, []).append(e.event_id) - for uid in user_ids: - txn.call_after( - self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid), + for room_id, event_ids in room_to_event_ids.items(): + rows = self.db_pool.simple_select_many_txn( + txn, + table="event_push_actions_staging", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("user_id",), ) + user_ids = {row["user_id"] for row in rows} + + for user_id in user_ids: + txn.call_after( + self.store.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id, user_id), + ) + # Now we delete the staging area for *all* events that were being # persisted. txn.execute_batch( -- cgit 1.5.1 From c571736c6ca5d1d2d9bf7cd9b717465d446ac7b3 Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Fri, 16 Apr 2021 18:17:18 +0100 Subject: User directory: use calculated room membership state instead (#9821) Fixes: #9797. Should help reduce CPU usage on the user directory, especially when memberships change in rooms with lots of state history. --- changelog.d/9821.misc | 1 + synapse/handlers/user_directory.py | 15 ++++++++------- synapse/storage/databases/main/roommember.py | 27 +++++++++++++++++++++++++++ 3 files changed, 36 insertions(+), 7 deletions(-) create mode 100644 changelog.d/9821.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9821.misc b/changelog.d/9821.misc new file mode 100644 index 0000000000..03b2d2ed4d --- /dev/null +++ b/changelog.d/9821.misc @@ -0,0 +1 @@ +Reduce CPU usage of the user directory by reusing existing calculated room membership. \ No newline at end of file diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py index 9b1e6d5c18..dacc4f3076 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py @@ -44,7 +44,6 @@ class UserDirectoryHandler(StateDeltasHandler): super().__init__(hs) self.store = hs.get_datastore() - self.state = hs.get_state_handler() self.server_name = hs.hostname self.clock = hs.get_clock() self.notifier = hs.get_notifier() @@ -302,10 +301,12 @@ class UserDirectoryHandler(StateDeltasHandler): # ignore the change return - users_with_profile = await self.state.get_current_users_in_room(room_id) + other_users_in_room_with_profiles = ( + await self.store.get_users_in_room_with_profiles(room_id) + ) # Remove every user from the sharing tables for that room. - for user_id in users_with_profile.keys(): + for user_id in other_users_in_room_with_profiles.keys(): await self.store.remove_user_who_share_room(user_id, room_id) # Then, re-add them to the tables. @@ -314,7 +315,7 @@ class UserDirectoryHandler(StateDeltasHandler): # which when ran over an entire room, will result in the same values # being added multiple times. The batching upserts shouldn't make this # too bad, though. - for user_id, profile in users_with_profile.items(): + for user_id, profile in other_users_in_room_with_profiles.items(): await self._handle_new_user(room_id, user_id, profile) async def _handle_new_user( @@ -336,7 +337,7 @@ class UserDirectoryHandler(StateDeltasHandler): room_id ) # Now we update users who share rooms with users. - users_with_profile = await self.state.get_current_users_in_room(room_id) + other_users_in_room = await self.store.get_users_in_room(room_id) if is_public: await self.store.add_users_in_public_rooms(room_id, (user_id,)) @@ -352,14 +353,14 @@ class UserDirectoryHandler(StateDeltasHandler): # We don't care about appservice users. if not is_appservice: - for other_user_id in users_with_profile: + for other_user_id in other_users_in_room: if user_id == other_user_id: continue to_insert.add((user_id, other_user_id)) # Next we need to update for every local user in the room - for other_user_id in users_with_profile: + for other_user_id in other_users_in_room: if user_id == other_user_id: continue diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index ef5587f87a..fd525dce65 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -173,6 +173,33 @@ class RoomMemberWorkerStore(EventsWorkerStore): txn.execute(sql, (room_id, Membership.JOIN)) return [r[0] for r in txn] + @cached(max_entries=100000, iterable=True) + async def get_users_in_room_with_profiles( + self, room_id: str + ) -> Dict[str, ProfileInfo]: + """Get a mapping from user ID to profile information for all users in a given room. + + Args: + room_id: The ID of the room to retrieve the users of. + + Returns: + A mapping from user ID to ProfileInfo. + """ + + def _get_users_in_room_with_profiles(txn) -> Dict[str, ProfileInfo]: + sql = """ + SELECT user_id, display_name, avatar_url FROM room_memberships + WHERE room_id = ? AND membership = ? + """ + txn.execute(sql, (room_id, Membership.JOIN)) + + return {r[0]: ProfileInfo(display_name=r[1], avatar_url=r[2]) for r in txn} + + return await self.db_pool.runInteraction( + "get_users_in_room_with_profiles", + _get_users_in_room_with_profiles, + ) + @cached(max_entries=100000) async def get_room_summary(self, room_id: str) -> Dict[str, MemberSummary]: """Get the details of a room roughly suitable for use by the room -- cgit 1.5.1 From 71f0623de968f07292d5a092e9197f7513ab6cde Mon Sep 17 00:00:00 2001 From: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com> Date: Mon, 19 Apr 2021 19:16:34 +0100 Subject: Port "Allow users to click account renewal links multiple times without hitting an 'Invalid Token' page #74" from synapse-dinsic (#9832) This attempts to be a direct port of https://github.com/matrix-org/synapse-dinsic/pull/74 to mainline. There was some fiddling required to deal with the changes that have been made to mainline since (mainly dealing with the split of `RegistrationWorkerStore` from `RegistrationStore`, and the changes made to `self.make_request` in test code). --- UPGRADE.rst | 23 +++ changelog.d/9832.feature | 1 + docs/sample_config.yaml | 148 ++++++++++-------- synapse/api/auth.py | 6 +- synapse/config/_base.pyi | 2 + synapse/config/account_validity.py | 165 +++++++++++++++++++++ synapse/config/emailconfig.py | 2 +- synapse/config/homeserver.py | 3 +- synapse/config/registration.py | 129 ---------------- synapse/handlers/account_validity.py | 101 ++++++++++--- synapse/handlers/deactivate_account.py | 4 +- synapse/push/pusherpool.py | 8 +- .../res/templates/account_previously_renewed.html | 1 + synapse/res/templates/account_renewed.html | 2 +- synapse/rest/client/v2_alpha/account_validity.py | 32 +++- synapse/storage/databases/main/registration.py | 62 ++++++-- .../59/12account_validity_token_used_ts_ms.sql | 18 +++ tests/rest/client/v2_alpha/test_register.py | 52 +++++-- 18 files changed, 496 insertions(+), 263 deletions(-) create mode 100644 changelog.d/9832.feature create mode 100644 synapse/config/account_validity.py create mode 100644 synapse/res/templates/account_previously_renewed.html create mode 100644 synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql (limited to 'synapse/storage/databases/main') diff --git a/UPGRADE.rst b/UPGRADE.rst index 665821d4ef..eff976017d 100644 --- a/UPGRADE.rst +++ b/UPGRADE.rst @@ -85,6 +85,29 @@ for example: wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb +Upgrading to v1.33.0 +==================== + +Account Validity HTML templates can now display a user's expiration date +------------------------------------------------------------------------ + +This may affect you if you have enabled the account validity feature, and have made use of a +custom HTML template specified by the ``account_validity.template_dir`` or ``account_validity.account_renewed_html_path`` +Synapse config options. + +The template can now accept an ``expiration_ts`` variable, which represents the unix timestamp in milliseconds for the +future date of which their account has been renewed until. See the +`default template `_ +for an example of usage. + +ALso note that a new HTML template, ``account_previously_renewed.html``, has been added. This is is shown to users +when they attempt to renew their account with a valid renewal token that has already been used before. The default +template contents can been found +`here `_, +and can also accept an ``expiration_ts`` variable. This template replaces the error message users would previously see +upon attempting to use a valid renewal token more than once. + + Upgrading to v1.32.0 ==================== diff --git a/changelog.d/9832.feature b/changelog.d/9832.feature new file mode 100644 index 0000000000..e76395fbe8 --- /dev/null +++ b/changelog.d/9832.feature @@ -0,0 +1 @@ +Don't return an error when a user attempts to renew their account multiple times with the same token. Instead, state when their account is set to expire. This change concerns the optional account validity feature. \ No newline at end of file diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml index 9182dcd987..d260d76259 100644 --- a/docs/sample_config.yaml +++ b/docs/sample_config.yaml @@ -1175,69 +1175,6 @@ url_preview_accept_language: # #enable_registration: false -# Optional account validity configuration. This allows for accounts to be denied -# any request after a given period. -# -# Once this feature is enabled, Synapse will look for registered users without an -# expiration date at startup and will add one to every account it found using the -# current settings at that time. -# This means that, if a validity period is set, and Synapse is restarted (it will -# then derive an expiration date from the current validity period), and some time -# after that the validity period changes and Synapse is restarted, the users' -# expiration dates won't be updated unless their account is manually renewed. This -# date will be randomly selected within a range [now + period - d ; now + period], -# where d is equal to 10% of the validity period. -# -account_validity: - # The account validity feature is disabled by default. Uncomment the - # following line to enable it. - # - #enabled: true - - # The period after which an account is valid after its registration. When - # renewing the account, its validity period will be extended by this amount - # of time. This parameter is required when using the account validity - # feature. - # - #period: 6w - - # The amount of time before an account's expiry date at which Synapse will - # send an email to the account's email address with a renewal link. By - # default, no such emails are sent. - # - # If you enable this setting, you will also need to fill out the 'email' and - # 'public_baseurl' configuration sections. - # - #renew_at: 1w - - # The subject of the email sent out with the renewal link. '%(app)s' can be - # used as a placeholder for the 'app_name' parameter from the 'email' - # section. - # - # Note that the placeholder must be written '%(app)s', including the - # trailing 's'. - # - # If this is not set, a default value is used. - # - #renew_email_subject: "Renew your %(app)s account" - - # Directory in which Synapse will try to find templates for the HTML files to - # serve to the user when trying to renew an account. If not set, default - # templates from within the Synapse package will be used. - # - #template_dir: "res/templates" - - # File within 'template_dir' giving the HTML to be displayed to the user after - # they successfully renewed their account. If not set, default text is used. - # - #account_renewed_html_path: "account_renewed.html" - - # File within 'template_dir' giving the HTML to be displayed when the user - # tries to renew an account with an invalid renewal token. If not set, - # default text is used. - # - #invalid_token_html_path: "invalid_token.html" - # Time that a user's session remains valid for, after they log in. # # Note that this is not currently compatible with guest logins. @@ -1432,6 +1369,91 @@ account_threepid_delegates: #auto_join_rooms_for_guests: false +## Account Validity ## + +# Optional account validity configuration. This allows for accounts to be denied +# any request after a given period. +# +# Once this feature is enabled, Synapse will look for registered users without an +# expiration date at startup and will add one to every account it found using the +# current settings at that time. +# This means that, if a validity period is set, and Synapse is restarted (it will +# then derive an expiration date from the current validity period), and some time +# after that the validity period changes and Synapse is restarted, the users' +# expiration dates won't be updated unless their account is manually renewed. This +# date will be randomly selected within a range [now + period - d ; now + period], +# where d is equal to 10% of the validity period. +# +account_validity: + # The account validity feature is disabled by default. Uncomment the + # following line to enable it. + # + #enabled: true + + # The period after which an account is valid after its registration. When + # renewing the account, its validity period will be extended by this amount + # of time. This parameter is required when using the account validity + # feature. + # + #period: 6w + + # The amount of time before an account's expiry date at which Synapse will + # send an email to the account's email address with a renewal link. By + # default, no such emails are sent. + # + # If you enable this setting, you will also need to fill out the 'email' and + # 'public_baseurl' configuration sections. + # + #renew_at: 1w + + # The subject of the email sent out with the renewal link. '%(app)s' can be + # used as a placeholder for the 'app_name' parameter from the 'email' + # section. + # + # Note that the placeholder must be written '%(app)s', including the + # trailing 's'. + # + # If this is not set, a default value is used. + # + #renew_email_subject: "Renew your %(app)s account" + + # Directory in which Synapse will try to find templates for the HTML files to + # serve to the user when trying to renew an account. If not set, default + # templates from within the Synapse package will be used. + # + # The currently available templates are: + # + # * account_renewed.html: Displayed to the user after they have successfully + # renewed their account. + # + # * account_previously_renewed.html: Displayed to the user if they attempt to + # renew their account with a token that is valid, but that has already + # been used. In this case the account is not renewed again. + # + # * invalid_token.html: Displayed to the user when they try to renew an account + # with an unknown or invalid renewal token. + # + # See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for + # default template contents. + # + # The file name of some of these templates can be configured below for legacy + # reasons. + # + #template_dir: "res/templates" + + # A custom file name for the 'account_renewed.html' template. + # + # If not set, the file is assumed to be named "account_renewed.html". + # + #account_renewed_html_path: "account_renewed.html" + + # A custom file name for the 'invalid_token.html' template. + # + # If not set, the file is assumed to be named "invalid_token.html". + # + #invalid_token_html_path: "invalid_token.html" + + ## Metrics ### # Enable collection and rendering of performance metrics diff --git a/synapse/api/auth.py b/synapse/api/auth.py index 6c13f53957..872fd100cd 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -79,7 +79,9 @@ class Auth: self._auth_blocking = AuthBlocking(self.hs) - self._account_validity = hs.config.account_validity + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) self._track_appservice_user_ips = hs.config.track_appservice_user_ips self._macaroon_secret_key = hs.config.macaroon_secret_key @@ -222,7 +224,7 @@ class Auth: shadow_banned = user_info.shadow_banned # Deny the request if the user account has expired. - if self._account_validity.enabled and not allow_expired: + if self._account_validity_enabled and not allow_expired: if await self.store.is_account_expired( user_info.user_id, self.clock.time_msec() ): diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi index e896fd34e2..ddec356a07 100644 --- a/synapse/config/_base.pyi +++ b/synapse/config/_base.pyi @@ -1,6 +1,7 @@ from typing import Any, Iterable, List, Optional from synapse.config import ( + account_validity, api, appservice, auth, @@ -59,6 +60,7 @@ class RootConfig: captcha: captcha.CaptchaConfig voip: voip.VoipConfig registration: registration.RegistrationConfig + account_validity: account_validity.AccountValidityConfig metrics: metrics.MetricsConfig api: api.ApiConfig appservice: appservice.AppServiceConfig diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py new file mode 100644 index 0000000000..c58a7d95a7 --- /dev/null +++ b/synapse/config/account_validity.py @@ -0,0 +1,165 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.config._base import Config, ConfigError + + +class AccountValidityConfig(Config): + section = "account_validity" + + def read_config(self, config, **kwargs): + account_validity_config = config.get("account_validity") or {} + self.account_validity_enabled = account_validity_config.get("enabled", False) + self.account_validity_renew_by_email_enabled = ( + "renew_at" in account_validity_config + ) + + if self.account_validity_enabled: + if "period" in account_validity_config: + self.account_validity_period = self.parse_duration( + account_validity_config["period"] + ) + else: + raise ConfigError("'period' is required when using account validity") + + if "renew_at" in account_validity_config: + self.account_validity_renew_at = self.parse_duration( + account_validity_config["renew_at"] + ) + + if "renew_email_subject" in account_validity_config: + self.account_validity_renew_email_subject = account_validity_config[ + "renew_email_subject" + ] + else: + self.account_validity_renew_email_subject = "Renew your %(app)s account" + + self.account_validity_startup_job_max_delta = ( + self.account_validity_period * 10.0 / 100.0 + ) + + if self.account_validity_renew_by_email_enabled: + if not self.public_baseurl: + raise ConfigError("Can't send renewal emails without 'public_baseurl'") + + # Load account validity templates. + account_validity_template_dir = account_validity_config.get("template_dir") + + account_renewed_template_filename = account_validity_config.get( + "account_renewed_html_path", "account_renewed.html" + ) + invalid_token_template_filename = account_validity_config.get( + "invalid_token_html_path", "invalid_token.html" + ) + + # Read and store template content + ( + self.account_validity_account_renewed_template, + self.account_validity_account_previously_renewed_template, + self.account_validity_invalid_token_template, + ) = self.read_templates( + [ + account_renewed_template_filename, + "account_previously_renewed.html", + invalid_token_template_filename, + ], + account_validity_template_dir, + ) + + def generate_config_section(self, **kwargs): + return """\ + ## Account Validity ## + + # Optional account validity configuration. This allows for accounts to be denied + # any request after a given period. + # + # Once this feature is enabled, Synapse will look for registered users without an + # expiration date at startup and will add one to every account it found using the + # current settings at that time. + # This means that, if a validity period is set, and Synapse is restarted (it will + # then derive an expiration date from the current validity period), and some time + # after that the validity period changes and Synapse is restarted, the users' + # expiration dates won't be updated unless their account is manually renewed. This + # date will be randomly selected within a range [now + period - d ; now + period], + # where d is equal to 10% of the validity period. + # + account_validity: + # The account validity feature is disabled by default. Uncomment the + # following line to enable it. + # + #enabled: true + + # The period after which an account is valid after its registration. When + # renewing the account, its validity period will be extended by this amount + # of time. This parameter is required when using the account validity + # feature. + # + #period: 6w + + # The amount of time before an account's expiry date at which Synapse will + # send an email to the account's email address with a renewal link. By + # default, no such emails are sent. + # + # If you enable this setting, you will also need to fill out the 'email' and + # 'public_baseurl' configuration sections. + # + #renew_at: 1w + + # The subject of the email sent out with the renewal link. '%(app)s' can be + # used as a placeholder for the 'app_name' parameter from the 'email' + # section. + # + # Note that the placeholder must be written '%(app)s', including the + # trailing 's'. + # + # If this is not set, a default value is used. + # + #renew_email_subject: "Renew your %(app)s account" + + # Directory in which Synapse will try to find templates for the HTML files to + # serve to the user when trying to renew an account. If not set, default + # templates from within the Synapse package will be used. + # + # The currently available templates are: + # + # * account_renewed.html: Displayed to the user after they have successfully + # renewed their account. + # + # * account_previously_renewed.html: Displayed to the user if they attempt to + # renew their account with a token that is valid, but that has already + # been used. In this case the account is not renewed again. + # + # * invalid_token.html: Displayed to the user when they try to renew an account + # with an unknown or invalid renewal token. + # + # See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for + # default template contents. + # + # The file name of some of these templates can be configured below for legacy + # reasons. + # + #template_dir: "res/templates" + + # A custom file name for the 'account_renewed.html' template. + # + # If not set, the file is assumed to be named "account_renewed.html". + # + #account_renewed_html_path: "account_renewed.html" + + # A custom file name for the 'invalid_token.html' template. + # + # If not set, the file is assumed to be named "invalid_token.html". + # + #invalid_token_html_path: "invalid_token.html" + """ diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py index c587939c7a..5564d7d097 100644 --- a/synapse/config/emailconfig.py +++ b/synapse/config/emailconfig.py @@ -299,7 +299,7 @@ class EmailConfig(Config): "client_base_url", email_config.get("riot_base_url", None) ) - if self.account_validity.renew_by_email_enabled: + if self.account_validity_renew_by_email_enabled: expiry_template_html = email_config.get( "expiry_template_html", "notice_expiry.html" ) diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py index 1309535068..58e3bcd511 100644 --- a/synapse/config/homeserver.py +++ b/synapse/config/homeserver.py @@ -12,8 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - from ._base import RootConfig +from .account_validity import AccountValidityConfig from .api import ApiConfig from .appservice import AppServiceConfig from .auth import AuthConfig @@ -68,6 +68,7 @@ class HomeServerConfig(RootConfig): CaptchaConfig, VoipConfig, RegistrationConfig, + AccountValidityConfig, MetricsConfig, ApiConfig, AppServiceConfig, diff --git a/synapse/config/registration.py b/synapse/config/registration.py index f8a2768af8..e6f52b4f40 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -12,74 +12,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - -import pkg_resources - from synapse.api.constants import RoomCreationPreset from synapse.config._base import Config, ConfigError from synapse.types import RoomAlias, UserID from synapse.util.stringutils import random_string_with_symbols, strtobool -class AccountValidityConfig(Config): - section = "accountvalidity" - - def __init__(self, config, synapse_config): - if config is None: - return - super().__init__() - self.enabled = config.get("enabled", False) - self.renew_by_email_enabled = "renew_at" in config - - if self.enabled: - if "period" in config: - self.period = self.parse_duration(config["period"]) - else: - raise ConfigError("'period' is required when using account validity") - - if "renew_at" in config: - self.renew_at = self.parse_duration(config["renew_at"]) - - if "renew_email_subject" in config: - self.renew_email_subject = config["renew_email_subject"] - else: - self.renew_email_subject = "Renew your %(app)s account" - - self.startup_job_max_delta = self.period * 10.0 / 100.0 - - if self.renew_by_email_enabled: - if "public_baseurl" not in synapse_config: - raise ConfigError("Can't send renewal emails without 'public_baseurl'") - - template_dir = config.get("template_dir") - - if not template_dir: - template_dir = pkg_resources.resource_filename("synapse", "res/templates") - - if "account_renewed_html_path" in config: - file_path = os.path.join(template_dir, config["account_renewed_html_path"]) - - self.account_renewed_html_content = self.read_file( - file_path, "account_validity.account_renewed_html_path" - ) - else: - self.account_renewed_html_content = ( - "Your account has been successfully renewed." - ) - - if "invalid_token_html_path" in config: - file_path = os.path.join(template_dir, config["invalid_token_html_path"]) - - self.invalid_token_html_content = self.read_file( - file_path, "account_validity.invalid_token_html_path" - ) - else: - self.invalid_token_html_content = ( - "Invalid renewal token." - ) - - class RegistrationConfig(Config): section = "registration" @@ -92,10 +30,6 @@ class RegistrationConfig(Config): str(config["disable_registration"]) ) - self.account_validity = AccountValidityConfig( - config.get("account_validity") or {}, config - ) - self.registrations_require_3pid = config.get("registrations_require_3pid", []) self.allowed_local_3pids = config.get("allowed_local_3pids", []) self.enable_3pid_lookup = config.get("enable_3pid_lookup", True) @@ -207,69 +141,6 @@ class RegistrationConfig(Config): # #enable_registration: false - # Optional account validity configuration. This allows for accounts to be denied - # any request after a given period. - # - # Once this feature is enabled, Synapse will look for registered users without an - # expiration date at startup and will add one to every account it found using the - # current settings at that time. - # This means that, if a validity period is set, and Synapse is restarted (it will - # then derive an expiration date from the current validity period), and some time - # after that the validity period changes and Synapse is restarted, the users' - # expiration dates won't be updated unless their account is manually renewed. This - # date will be randomly selected within a range [now + period - d ; now + period], - # where d is equal to 10%% of the validity period. - # - account_validity: - # The account validity feature is disabled by default. Uncomment the - # following line to enable it. - # - #enabled: true - - # The period after which an account is valid after its registration. When - # renewing the account, its validity period will be extended by this amount - # of time. This parameter is required when using the account validity - # feature. - # - #period: 6w - - # The amount of time before an account's expiry date at which Synapse will - # send an email to the account's email address with a renewal link. By - # default, no such emails are sent. - # - # If you enable this setting, you will also need to fill out the 'email' and - # 'public_baseurl' configuration sections. - # - #renew_at: 1w - - # The subject of the email sent out with the renewal link. '%%(app)s' can be - # used as a placeholder for the 'app_name' parameter from the 'email' - # section. - # - # Note that the placeholder must be written '%%(app)s', including the - # trailing 's'. - # - # If this is not set, a default value is used. - # - #renew_email_subject: "Renew your %%(app)s account" - - # Directory in which Synapse will try to find templates for the HTML files to - # serve to the user when trying to renew an account. If not set, default - # templates from within the Synapse package will be used. - # - #template_dir: "res/templates" - - # File within 'template_dir' giving the HTML to be displayed to the user after - # they successfully renewed their account. If not set, default text is used. - # - #account_renewed_html_path: "account_renewed.html" - - # File within 'template_dir' giving the HTML to be displayed when the user - # tries to renew an account with an invalid renewal token. If not set, - # default text is used. - # - #invalid_token_html_path: "invalid_token.html" - # Time that a user's session remains valid for, after they log in. # # Note that this is not currently compatible with guest logins. diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 66ce7e8b83..5b927f10b3 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -17,7 +17,7 @@ import email.utils import logging from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.errors import StoreError, SynapseError from synapse.logging.context import make_deferred_yieldable @@ -39,28 +39,44 @@ class AccountValidityHandler: self.sendmail = self.hs.get_sendmail() self.clock = self.hs.get_clock() - self._account_validity = self.hs.config.account_validity + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) + self._account_validity_renew_by_email_enabled = ( + hs.config.account_validity.account_validity_renew_by_email_enabled + ) + + self._account_validity_period = None + if self._account_validity_enabled: + self._account_validity_period = ( + hs.config.account_validity.account_validity_period + ) if ( - self._account_validity.enabled - and self._account_validity.renew_by_email_enabled + self._account_validity_enabled + and self._account_validity_renew_by_email_enabled ): # Don't do email-specific configuration if renewal by email is disabled. - self._template_html = self.config.account_validity_template_html - self._template_text = self.config.account_validity_template_text + self._template_html = ( + hs.config.account_validity.account_validity_template_html + ) + self._template_text = ( + hs.config.account_validity.account_validity_template_text + ) + account_validity_renew_email_subject = ( + hs.config.account_validity.account_validity_renew_email_subject + ) try: - app_name = self.hs.config.email_app_name + app_name = hs.config.email_app_name - self._subject = self._account_validity.renew_email_subject % { - "app": app_name - } + self._subject = account_validity_renew_email_subject % {"app": app_name} - self._from_string = self.hs.config.email_notif_from % {"app": app_name} + self._from_string = hs.config.email_notif_from % {"app": app_name} except Exception: # If substitution failed, fall back to the bare strings. - self._subject = self._account_validity.renew_email_subject - self._from_string = self.hs.config.email_notif_from + self._subject = account_validity_renew_email_subject + self._from_string = hs.config.email_notif_from self._raw_from = email.utils.parseaddr(self._from_string)[1] @@ -220,50 +236,87 @@ class AccountValidityHandler: attempts += 1 raise StoreError(500, "Couldn't generate a unique string as refresh string.") - async def renew_account(self, renewal_token: str) -> bool: + async def renew_account(self, renewal_token: str) -> Tuple[bool, bool, int]: """Renews the account attached to a given renewal token by pushing back the expiration date by the current validity period in the server's configuration. + If it turns out that the token is valid but has already been used, then the + token is considered stale. A token is stale if the 'token_used_ts_ms' db column + is non-null. + Args: renewal_token: Token sent with the renewal request. Returns: - Whether the provided token is valid. + A tuple containing: + * A bool representing whether the token is valid and unused. + * A bool which is `True` if the token is valid, but stale. + * An int representing the user's expiry timestamp as milliseconds since the + epoch, or 0 if the token was invalid. """ try: - user_id = await self.store.get_user_from_renewal_token(renewal_token) + ( + user_id, + current_expiration_ts, + token_used_ts, + ) = await self.store.get_user_from_renewal_token(renewal_token) except StoreError: - return False + return False, False, 0 + + # Check whether this token has already been used. + if token_used_ts: + logger.info( + "User '%s' attempted to use previously used token '%s' to renew account", + user_id, + renewal_token, + ) + return False, True, current_expiration_ts logger.debug("Renewing an account for user %s", user_id) - await self.renew_account_for_user(user_id) - return True + # Renew the account. Pass the renewal_token here so that it is not cleared. + # We want to keep the token around in case the user attempts to renew their + # account with the same token twice (clicking the email link twice). + # + # In that case, the token will be accepted, but the account's expiration ts + # will remain unchanged. + new_expiration_ts = await self.renew_account_for_user( + user_id, renewal_token=renewal_token + ) + + return True, False, new_expiration_ts async def renew_account_for_user( self, user_id: str, expiration_ts: Optional[int] = None, email_sent: bool = False, + renewal_token: Optional[str] = None, ) -> int: """Renews the account attached to a given user by pushing back the expiration date by the current validity period in the server's configuration. Args: - renewal_token: Token sent with the renewal request. + user_id: The ID of the user to renew. expiration_ts: New expiration date. Defaults to now + validity period. - email_sen: Whether an email has been sent for this validity period. - Defaults to False. + email_sent: Whether an email has been sent for this validity period. + renewal_token: Token sent with the renewal request. The user's token + will be cleared if this is None. Returns: New expiration date for this account, as a timestamp in milliseconds since epoch. """ + now = self.clock.time_msec() if expiration_ts is None: - expiration_ts = self.clock.time_msec() + self._account_validity.period + expiration_ts = now + self._account_validity_period await self.store.set_account_validity_for_user( - user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent + user_id=user_id, + expiration_ts=expiration_ts, + email_sent=email_sent, + renewal_token=renewal_token, + token_used_ts=now, ) return expiration_ts diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py index 3f6f9f7f3d..45d2404dde 100644 --- a/synapse/handlers/deactivate_account.py +++ b/synapse/handlers/deactivate_account.py @@ -49,7 +49,9 @@ class DeactivateAccountHandler(BaseHandler): if hs.config.run_background_tasks: hs.get_reactor().callWhenRunning(self._start_user_parting) - self._account_validity_enabled = hs.config.account_validity.enabled + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) async def deactivate_account( self, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 564a5ed0df..579fcdf472 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -62,7 +62,9 @@ class PusherPool: self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() - self._account_validity = hs.config.account_validity + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) # We shard the handling of push notifications by user ID. self._pusher_shard_config = hs.config.push.pusher_shard_config @@ -236,7 +238,7 @@ class PusherPool: for u in users_affected: # Don't push if the user account has expired - if self._account_validity.enabled: + if self._account_validity_enabled: expired = await self.store.is_account_expired( u, self.clock.time_msec() ) @@ -266,7 +268,7 @@ class PusherPool: for u in users_affected: # Don't push if the user account has expired - if self._account_validity.enabled: + if self._account_validity_enabled: expired = await self.store.is_account_expired( u, self.clock.time_msec() ) diff --git a/synapse/res/templates/account_previously_renewed.html b/synapse/res/templates/account_previously_renewed.html new file mode 100644 index 0000000000..b751359bdf --- /dev/null +++ b/synapse/res/templates/account_previously_renewed.html @@ -0,0 +1 @@ +Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}. diff --git a/synapse/res/templates/account_renewed.html b/synapse/res/templates/account_renewed.html index 894da030af..e8c0f52f05 100644 --- a/synapse/res/templates/account_renewed.html +++ b/synapse/res/templates/account_renewed.html @@ -1 +1 @@ -Your account has been successfully renewed. +Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}. diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py index 0ad07fb895..2d1ad3d3fb 100644 --- a/synapse/rest/client/v2_alpha/account_validity.py +++ b/synapse/rest/client/v2_alpha/account_validity.py @@ -36,24 +36,40 @@ class AccountValidityRenewServlet(RestServlet): self.hs = hs self.account_activity_handler = hs.get_account_validity_handler() self.auth = hs.get_auth() - self.success_html = hs.config.account_validity.account_renewed_html_content - self.failure_html = hs.config.account_validity.invalid_token_html_content + self.account_renewed_template = ( + hs.config.account_validity.account_validity_account_renewed_template + ) + self.account_previously_renewed_template = ( + hs.config.account_validity.account_validity_account_previously_renewed_template + ) + self.invalid_token_template = ( + hs.config.account_validity.account_validity_invalid_token_template + ) async def on_GET(self, request): if b"token" not in request.args: raise SynapseError(400, "Missing renewal token") renewal_token = request.args[b"token"][0] - token_valid = await self.account_activity_handler.renew_account( + ( + token_valid, + token_stale, + expiration_ts, + ) = await self.account_activity_handler.renew_account( renewal_token.decode("utf8") ) if token_valid: status_code = 200 - response = self.success_html + response = self.account_renewed_template.render(expiration_ts=expiration_ts) + elif token_stale: + status_code = 200 + response = self.account_previously_renewed_template.render( + expiration_ts=expiration_ts + ) else: status_code = 404 - response = self.failure_html + response = self.invalid_token_template.render(expiration_ts=expiration_ts) respond_with_html(request, status_code, response) @@ -71,10 +87,12 @@ class AccountValiditySendMailServlet(RestServlet): self.hs = hs self.account_activity_handler = hs.get_account_validity_handler() self.auth = hs.get_auth() - self.account_validity = self.hs.config.account_validity + self.account_validity_renew_by_email_enabled = ( + hs.config.account_validity.account_validity_renew_by_email_enabled + ) async def on_POST(self, request): - if not self.account_validity.renew_by_email_enabled: + if not self.account_validity_renew_by_email_enabled: raise AuthError( 403, "Account renewal via email is disabled on this server." ) diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 833214b7e0..6e5ee557d2 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -91,13 +91,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): id_column=None, ) - self._account_validity = hs.config.account_validity - if hs.config.run_background_tasks and self._account_validity.enabled: - self._clock.call_later( - 0.0, - self._set_expiration_date_when_missing, + self._account_validity_enabled = ( + hs.config.account_validity.account_validity_enabled + ) + self._account_validity_period = None + self._account_validity_startup_job_max_delta = None + if self._account_validity_enabled: + self._account_validity_period = ( + hs.config.account_validity.account_validity_period + ) + self._account_validity_startup_job_max_delta = ( + hs.config.account_validity.account_validity_startup_job_max_delta ) + if hs.config.run_background_tasks: + self._clock.call_later( + 0.0, + self._set_expiration_date_when_missing, + ) + # Create a background job for culling expired 3PID validity tokens if hs.config.run_background_tasks: self._clock.looping_call( @@ -194,6 +206,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): expiration_ts: int, email_sent: bool, renewal_token: Optional[str] = None, + token_used_ts: Optional[int] = None, ) -> None: """Updates the account validity properties of the given account, with the given values. @@ -207,6 +220,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): period. renewal_token: Renewal token the user can use to extend the validity of their account. Defaults to no token. + token_used_ts: A timestamp of when the current token was used to renew + the account. """ def set_account_validity_for_user_txn(txn): @@ -218,6 +233,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "expiration_ts_ms": expiration_ts, "email_sent": email_sent, "renewal_token": renewal_token, + "token_used_ts_ms": token_used_ts, }, ) self._invalidate_cache_and_stream( @@ -231,7 +247,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): async def set_renewal_token_for_user( self, user_id: str, renewal_token: str ) -> None: - """Defines a renewal token for a given user. + """Defines a renewal token for a given user, and clears the token_used timestamp. Args: user_id: ID of the user to set the renewal token for. @@ -244,26 +260,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): await self.db_pool.simple_update_one( table="account_validity", keyvalues={"user_id": user_id}, - updatevalues={"renewal_token": renewal_token}, + updatevalues={"renewal_token": renewal_token, "token_used_ts_ms": None}, desc="set_renewal_token_for_user", ) - async def get_user_from_renewal_token(self, renewal_token: str) -> str: - """Get a user ID from a renewal token. + async def get_user_from_renewal_token( + self, renewal_token: str + ) -> Tuple[str, int, Optional[int]]: + """Get a user ID and renewal status from a renewal token. Args: renewal_token: The renewal token to perform the lookup with. Returns: - The ID of the user to which the token belongs. + A tuple of containing the following values: + * The ID of a user to which the token belongs. + * An int representing the user's expiry timestamp as milliseconds since the + epoch, or 0 if the token was invalid. + * An optional int representing the timestamp of when the user renewed their + account timestamp as milliseconds since the epoch. None if the account + has not been renewed using the current token yet. """ - return await self.db_pool.simple_select_one_onecol( + ret_dict = await self.db_pool.simple_select_one( table="account_validity", keyvalues={"renewal_token": renewal_token}, - retcol="user_id", + retcols=["user_id", "expiration_ts_ms", "token_used_ts_ms"], desc="get_user_from_renewal_token", ) + return ( + ret_dict["user_id"], + ret_dict["expiration_ts_ms"], + ret_dict["token_used_ts_ms"], + ) + async def get_renewal_token_for_user(self, user_id: str) -> str: """Get the renewal token associated with a given user ID. @@ -302,7 +332,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): "get_users_expiring_soon", select_users_txn, self._clock.time_msec(), - self.config.account_validity.renew_at, + self.config.account_validity_renew_at, ) async def set_renewal_mail_status(self, user_id: str, email_sent: bool) -> None: @@ -964,11 +994,11 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): delta equal to 10% of the validity period. """ now_ms = self._clock.time_msec() - expiration_ts = now_ms + self._account_validity.period + expiration_ts = now_ms + self._account_validity_period if use_delta: expiration_ts = self.rand.randrange( - expiration_ts - self._account_validity.startup_job_max_delta, + expiration_ts - self._account_validity_startup_job_max_delta, expiration_ts, ) @@ -1412,7 +1442,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore): except self.database_engine.module.IntegrityError: raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE) - if self._account_validity.enabled: + if self._account_validity_enabled: self.set_expiration_date_for_user_txn(txn, user_id) if create_profile_with_displayname: diff --git a/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql new file mode 100644 index 0000000000..4836dac16e --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql @@ -0,0 +1,18 @@ +/* Copyright 2020 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Track when users renew their account using the value of the 'renewal_token' column. +-- This field should be set to NULL after a fresh token is generated. +ALTER TABLE account_validity ADD token_used_ts_ms BIGINT; diff --git a/tests/rest/client/v2_alpha/test_register.py b/tests/rest/client/v2_alpha/test_register.py index 054d4e4140..98695b05d5 100644 --- a/tests/rest/client/v2_alpha/test_register.py +++ b/tests/rest/client/v2_alpha/test_register.py @@ -492,8 +492,8 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase): (user_id, tok) = self.create_user() - # Move 6 days forward. This should trigger a renewal email to be sent. - self.reactor.advance(datetime.timedelta(days=6).total_seconds()) + # Move 5 days forward. This should trigger a renewal email to be sent. + self.reactor.advance(datetime.timedelta(days=5).total_seconds()) self.assertEqual(len(self.email_attempts), 1) # Retrieving the URL from the email is too much pain for now, so we @@ -504,14 +504,32 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase): self.assertEquals(channel.result["code"], b"200", channel.result) # Check that we're getting HTML back. - content_type = None - for header in channel.result.get("headers", []): - if header[0] == b"Content-Type": - content_type = header[1] - self.assertEqual(content_type, b"text/html; charset=utf-8", channel.result) + content_type = channel.headers.getRawHeaders(b"Content-Type") + self.assertEqual(content_type, [b"text/html; charset=utf-8"], channel.result) # Check that the HTML we're getting is the one we expect on a successful renewal. - expected_html = self.hs.config.account_validity.account_renewed_html_content + expiration_ts = self.get_success(self.store.get_expiration_ts_for_user(user_id)) + expected_html = self.hs.config.account_validity.account_validity_account_renewed_template.render( + expiration_ts=expiration_ts + ) + self.assertEqual( + channel.result["body"], expected_html.encode("utf8"), channel.result + ) + + # Move 1 day forward. Try to renew with the same token again. + url = "/_matrix/client/unstable/account_validity/renew?token=%s" % renewal_token + channel = self.make_request(b"GET", url) + self.assertEquals(channel.result["code"], b"200", channel.result) + + # Check that we're getting HTML back. + content_type = channel.headers.getRawHeaders(b"Content-Type") + self.assertEqual(content_type, [b"text/html; charset=utf-8"], channel.result) + + # Check that the HTML we're getting is the one we expect when reusing a + # token. The account expiration date should not have changed. + expected_html = self.hs.config.account_validity.account_validity_account_previously_renewed_template.render( + expiration_ts=expiration_ts + ) self.assertEqual( channel.result["body"], expected_html.encode("utf8"), channel.result ) @@ -531,15 +549,14 @@ class AccountValidityRenewalByEmailTestCase(unittest.HomeserverTestCase): self.assertEquals(channel.result["code"], b"404", channel.result) # Check that we're getting HTML back. - content_type = None - for header in channel.result.get("headers", []): - if header[0] == b"Content-Type": - content_type = header[1] - self.assertEqual(content_type, b"text/html; charset=utf-8", channel.result) + content_type = channel.headers.getRawHeaders(b"Content-Type") + self.assertEqual(content_type, [b"text/html; charset=utf-8"], channel.result) # Check that the HTML we're getting is the one we expect when using an # invalid/unknown token. - expected_html = self.hs.config.account_validity.invalid_token_html_content + expected_html = ( + self.hs.config.account_validity.account_validity_invalid_token_template.render() + ) self.assertEqual( channel.result["body"], expected_html.encode("utf8"), channel.result ) @@ -647,7 +664,12 @@ class AccountValidityBackgroundJobTestCase(unittest.HomeserverTestCase): config["account_validity"] = {"enabled": False} self.hs = self.setup_test_homeserver(config=config) - self.hs.config.account_validity.period = self.validity_period + + # We need to set these directly, instead of in the homeserver config dict above. + # This is due to account validity-related config options not being read by + # Synapse when account_validity.enabled is False. + self.hs.get_datastore()._account_validity_period = self.validity_period + self.hs.get_datastore()._account_validity_startup_job_max_delta = self.max_delta self.store = self.hs.get_datastore() -- cgit 1.5.1 From 495b214f4f8f45d16ffee851c8ab7a380dd0e2b2 Mon Sep 17 00:00:00 2001 From: Jonathan de Jong Date: Tue, 20 Apr 2021 12:50:49 +0200 Subject: Fix (final) Bugbear violations (#9838) --- changelog.d/9838.misc | 1 + scripts-dev/definitions.py | 2 +- scripts-dev/list_url_patterns.py | 2 +- setup.cfg | 3 +-- synapse/event_auth.py | 2 +- synapse/federation/send_queue.py | 4 ++-- synapse/handlers/auth.py | 2 +- synapse/handlers/device.py | 13 +++++-------- synapse/handlers/federation.py | 2 +- synapse/logging/_remote.py | 4 ++-- synapse/rest/key/v2/remote_key_resource.py | 4 ++-- synapse/storage/databases/main/events.py | 10 +++++----- tests/handlers/test_federation.py | 2 +- tests/replication/tcp/streams/test_events.py | 4 ++-- tests/rest/admin/test_device.py | 4 ++-- tests/rest/admin/test_event_reports.py | 8 ++++---- tests/rest/admin/test_room.py | 8 ++++---- tests/rest/admin/test_statistics.py | 2 +- tests/rest/admin/test_user.py | 4 ++-- tests/rest/client/v1/test_rooms.py | 6 +++--- tests/storage/test_event_metrics.py | 4 ++-- tests/unittest.py | 2 +- tests/utils.py | 2 +- 23 files changed, 46 insertions(+), 49 deletions(-) create mode 100644 changelog.d/9838.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9838.misc b/changelog.d/9838.misc new file mode 100644 index 0000000000..b98ce56309 --- /dev/null +++ b/changelog.d/9838.misc @@ -0,0 +1 @@ +Introduce flake8-bugbear to the test suite and fix some of its lint violations. \ No newline at end of file diff --git a/scripts-dev/definitions.py b/scripts-dev/definitions.py index 313860df13..c82ddd9677 100755 --- a/scripts-dev/definitions.py +++ b/scripts-dev/definitions.py @@ -140,7 +140,7 @@ if __name__ == "__main__": definitions = {} for directory in args.directories: - for root, dirs, files in os.walk(directory): + for root, _, files in os.walk(directory): for filename in files: if filename.endswith(".py"): filepath = os.path.join(root, filename) diff --git a/scripts-dev/list_url_patterns.py b/scripts-dev/list_url_patterns.py index 26ad7c67f4..e85420dea8 100755 --- a/scripts-dev/list_url_patterns.py +++ b/scripts-dev/list_url_patterns.py @@ -48,7 +48,7 @@ args = parser.parse_args() for directory in args.directories: - for root, dirs, files in os.walk(directory): + for root, _, files in os.walk(directory): for filename in files: if filename.endswith(".py"): filepath = os.path.join(root, filename) diff --git a/setup.cfg b/setup.cfg index 33601b71d5..e5ceb7ed19 100644 --- a/setup.cfg +++ b/setup.cfg @@ -18,8 +18,7 @@ ignore = # E203: whitespace before ':' (which is contrary to pep8?) # E731: do not assign a lambda expression, use a def # E501: Line too long (black enforces this for us) -# B007: Subsection of the bugbear suite (TODO: add in remaining fixes) -ignore=W503,W504,E203,E731,E501,B007 +ignore=W503,W504,E203,E731,E501 [isort] line_length = 88 diff --git a/synapse/event_auth.py b/synapse/event_auth.py index 5234e3f81e..c831d9f73c 100644 --- a/synapse/event_auth.py +++ b/synapse/event_auth.py @@ -670,7 +670,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase public_key = public_key_object["public_key"] try: for server, signature_block in signed["signatures"].items(): - for key_name, encoded_signature in signature_block.items(): + for key_name in signature_block.keys(): if not key_name.startswith("ed25519:"): continue verify_key = decode_verify_key_bytes( diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index d71f04e43e..65d76ea974 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -501,10 +501,10 @@ def process_rows_for_federation( states=[state], destinations=destinations ) - for destination, edu_map in buff.keyed_edus.items(): + for edu_map in buff.keyed_edus.values(): for key, edu in edu_map.items(): transaction_queue.send_edu(edu, key) - for destination, edu_list in buff.edus.items(): + for edu_list in buff.edus.values(): for edu in edu_list: transaction_queue.send_edu(edu, None) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index b8a37b6477..36f2450e2e 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -1248,7 +1248,7 @@ class AuthHandler(BaseHandler): # see if any of our auth providers want to know about this for provider in self.password_providers: - for token, token_id, device_id in tokens_and_devices: + for token, _, device_id in tokens_and_devices: await provider.on_logged_out( user_id=user_id, device_id=device_id, access_token=token ) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index d75edb184b..c1d7800981 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -156,8 +156,7 @@ class DeviceWorkerHandler(BaseHandler): # The user may have left the room # TODO: Check if they actually did or if we were just invited. if room_id not in room_ids: - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_left.add(state_key) @@ -179,8 +178,7 @@ class DeviceWorkerHandler(BaseHandler): log_kv( {"event": "encountered empty previous state", "room_id": room_id} ) - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_changed.add(state_key) @@ -198,8 +196,7 @@ class DeviceWorkerHandler(BaseHandler): for state_dict in prev_state_ids.values(): member_event = state_dict.get((EventTypes.Member, user_id), None) if not member_event or member_event != current_member_id: - for key, event_id in current_state_ids.items(): - etype, state_key = key + for etype, state_key in current_state_ids.keys(): if etype != EventTypes.Member: continue possibly_changed.add(state_key) @@ -714,7 +711,7 @@ class DeviceListUpdater: # This can happen since we batch updates return - for device_id, stream_id, prev_ids, content in pending_updates: + for device_id, stream_id, prev_ids, _ in pending_updates: logger.debug( "Handling update %r/%r, ID: %r, prev: %r ", user_id, @@ -740,7 +737,7 @@ class DeviceListUpdater: else: # Simply update the single device, since we know that is the only # change (because of the single prev_id matching the current cache) - for device_id, stream_id, prev_ids, content in pending_updates: + for device_id, stream_id, _, content in pending_updates: await self.store.update_remote_device_list_cache_entry( user_id, device_id, content, stream_id ) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 4b3730aa3b..dbdd7d2db3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -2956,7 +2956,7 @@ class FederationHandler(BaseHandler): try: # for each sig on the third_party_invite block of the actual invite for server, signature_block in signed["signatures"].items(): - for key_name, encoded_signature in signature_block.items(): + for key_name in signature_block.keys(): if not key_name.startswith("ed25519:"): continue diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 4e8b0f8d10..c515690b38 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -226,11 +226,11 @@ class RemoteHandler(logging.Handler): old_buffer = self._buffer self._buffer = deque() - for i in range(buffer_split): + for _ in range(buffer_split): self._buffer.append(old_buffer.popleft()) end_buffer = [] - for i in range(buffer_split): + for _ in range(buffer_split): end_buffer.append(old_buffer.pop()) self._buffer.extend(reversed(end_buffer)) diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py index c57ac22e58..f648678b09 100644 --- a/synapse/rest/key/v2/remote_key_resource.py +++ b/synapse/rest/key/v2/remote_key_resource.py @@ -144,7 +144,7 @@ class RemoteKey(DirectServeJsonResource): # Note that the value is unused. cache_misses = {} # type: Dict[str, Dict[str, int]] - for (server_name, key_id, from_server), results in cached.items(): + for (server_name, key_id, _), results in cached.items(): results = [(result["ts_added_ms"], result) for result in results] if not results and key_id is not None: @@ -206,7 +206,7 @@ class RemoteKey(DirectServeJsonResource): # Cast to bytes since postgresql returns a memoryview. json_results.add(bytes(most_recent_result["key_json"])) else: - for ts_added, result in results: + for _, result in results: # Cast to bytes since postgresql returns a memoryview. json_results.add(bytes(result["key_json"])) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index a362521e20..fd25c8112d 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -170,7 +170,7 @@ class PersistEventsStore: ) async with stream_ordering_manager as stream_orderings: - for (event, context), stream in zip(events_and_contexts, stream_orderings): + for (event, _), stream in zip(events_and_contexts, stream_orderings): event.internal_metadata.stream_ordering = stream await self.db_pool.runInteraction( @@ -297,7 +297,7 @@ class PersistEventsStore: txn.execute(sql + clause, args) to_recursively_check = [] - for event_id, prev_event_id, metadata, rejected in txn: + for _, prev_event_id, metadata, rejected in txn: if prev_event_id in existing_prevs: continue @@ -1127,7 +1127,7 @@ class PersistEventsStore: def _update_forward_extremities_txn( self, txn, new_forward_extremities, max_stream_order ): - for room_id, new_extrem in new_forward_extremities.items(): + for room_id in new_forward_extremities.keys(): self.db_pool.simple_delete_txn( txn, table="event_forward_extremities", keyvalues={"room_id": room_id} ) @@ -1399,7 +1399,7 @@ class PersistEventsStore: ] state_values = [] - for event, context in state_events_and_contexts: + for event, _ in state_events_and_contexts: vals = { "event_id": event.event_id, "room_id": event.room_id, @@ -1468,7 +1468,7 @@ class PersistEventsStore: # nothing to do here return - for event, context in events_and_contexts: + for event, _ in events_and_contexts: if event.type == EventTypes.Redaction and event.redacts is not None: # Remove the entries in the event_push_actions table for the # redacted event. diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index c7b0975a19..8796af45ed 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -222,7 +222,7 @@ class FederationTestCase(unittest.HomeserverTestCase): room_version, ) - for i in range(3): + for _ in range(3): event = create_invite() self.get_success( self.handler.on_invite_request( diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py index 323237c1bb..f51fa0a79e 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py @@ -239,7 +239,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for stream_name, token, row in received_rows: + for stream_name, _, row in received_rows: self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) self.assertEqual(row.type, "state") @@ -356,7 +356,7 @@ class EventsStreamTestCase(BaseStreamTestCase): # the state rows are unsorted state_rows = [] # type: List[EventsStreamCurrentStateRow] - for j in range(STATES_PER_USER + 1): + for _ in range(STATES_PER_USER + 1): stream_name, token, row = received_rows.pop(0) self.assertEqual("events", stream_name) self.assertIsInstance(row, EventsStreamRow) diff --git a/tests/rest/admin/test_device.py b/tests/rest/admin/test_device.py index ecbee30bb5..120730b764 100644 --- a/tests/rest/admin/test_device.py +++ b/tests/rest/admin/test_device.py @@ -430,7 +430,7 @@ class DevicesRestTestCase(unittest.HomeserverTestCase): """ # Create devices number_devices = 5 - for n in range(number_devices): + for _ in range(number_devices): self.login("user", "pass") # Get devices @@ -547,7 +547,7 @@ class DeleteDevicesRestTestCase(unittest.HomeserverTestCase): # Create devices number_devices = 5 - for n in range(number_devices): + for _ in range(number_devices): self.login("user", "pass") # Get devices diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py index 8c66da3af4..29341bc6e9 100644 --- a/tests/rest/admin/test_event_reports.py +++ b/tests/rest/admin/test_event_reports.py @@ -48,22 +48,22 @@ class EventReportsTestCase(unittest.HomeserverTestCase): self.helper.join(self.room_id2, user=self.admin_user, tok=self.admin_user_tok) # Two rooms and two users. Every user sends and reports every room event - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id1, user_tok=self.other_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id2, user_tok=self.other_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id1, user_tok=self.admin_user_tok, ) - for i in range(5): + for _ in range(5): self._create_event_and_report( room_id=self.room_id2, user_tok=self.admin_user_tok, diff --git a/tests/rest/admin/test_room.py b/tests/rest/admin/test_room.py index 6bcd997085..6b84188120 100644 --- a/tests/rest/admin/test_room.py +++ b/tests/rest/admin/test_room.py @@ -615,7 +615,7 @@ class RoomTestCase(unittest.HomeserverTestCase): # Create 3 test rooms total_rooms = 3 room_ids = [] - for x in range(total_rooms): + for _ in range(total_rooms): room_id = self.helper.create_room_as( self.admin_user, tok=self.admin_user_tok ) @@ -679,7 +679,7 @@ class RoomTestCase(unittest.HomeserverTestCase): # Create 5 test rooms total_rooms = 5 room_ids = [] - for x in range(total_rooms): + for _ in range(total_rooms): room_id = self.helper.create_room_as( self.admin_user, tok=self.admin_user_tok ) @@ -1577,7 +1577,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase): channel.json_body["event"]["event_id"], events[midway]["event_id"] ) - for i, found_event in enumerate(channel.json_body["events_before"]): + for found_event in channel.json_body["events_before"]: for j, posted_event in enumerate(events): if found_event["event_id"] == posted_event["event_id"]: self.assertTrue(j < midway) @@ -1585,7 +1585,7 @@ class JoinAliasRoomTestCase(unittest.HomeserverTestCase): else: self.fail("Event %s from events_before not found" % j) - for i, found_event in enumerate(channel.json_body["events_after"]): + for found_event in channel.json_body["events_after"]: for j, posted_event in enumerate(events): if found_event["event_id"] == posted_event["event_id"]: self.assertTrue(j > midway) diff --git a/tests/rest/admin/test_statistics.py b/tests/rest/admin/test_statistics.py index 363bdeeb2d..79cac4266b 100644 --- a/tests/rest/admin/test_statistics.py +++ b/tests/rest/admin/test_statistics.py @@ -467,7 +467,7 @@ class UserMediaStatisticsTestCase(unittest.HomeserverTestCase): number_media: Number of media to be created for the user """ upload_resource = self.media_repo.children[b"upload"] - for i in range(number_media): + for _ in range(number_media): # file size is 67 Byte image_data = unhexlify( b"89504e470d0a1a0a0000000d4948445200000001000000010806" diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 2844c493fc..b3afd51522 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -1937,7 +1937,7 @@ class UserMembershipRestTestCase(unittest.HomeserverTestCase): # Create rooms and join other_user_tok = self.login("user", "pass") number_rooms = 5 - for n in range(number_rooms): + for _ in range(number_rooms): self.helper.create_room_as(self.other_user, tok=other_user_tok) # Get rooms @@ -2517,7 +2517,7 @@ class UserMediaRestTestCase(unittest.HomeserverTestCase): user_token: Access token of the user number_media: Number of media to be created for the user """ - for i in range(number_media): + for _ in range(number_media): # file size is 67 Byte image_data = unhexlify( b"89504e470d0a1a0a0000000d4948445200000001000000010806" diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py index 92babf65e0..a3694f3d02 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py @@ -646,7 +646,7 @@ class RoomInviteRatelimitTestCase(RoomBase): def test_invites_by_users_ratelimit(self): """Tests that invites to a specific user are actually rate-limited.""" - for i in range(3): + for _ in range(3): room_id = self.helper.create_room_as(self.user_id) self.helper.invite(room_id, self.user_id, "@other-users:red") @@ -668,7 +668,7 @@ class RoomJoinRatelimitTestCase(RoomBase): ) def test_join_local_ratelimit(self): """Tests that local joins are actually rate-limited.""" - for i in range(3): + for _ in range(3): self.helper.create_room_as(self.user_id) self.helper.create_room_as(self.user_id, expect_code=429) @@ -733,7 +733,7 @@ class RoomJoinRatelimitTestCase(RoomBase): for path in paths_to_test: # Make sure we send more requests than the rate-limiting config would allow # if all of these requests ended up joining the user to a room. - for i in range(4): + for _ in range(4): channel = self.make_request("POST", path % room_id, {}) self.assertEquals(channel.code, 200) diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 397e68fe0a..088fbb247b 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -38,12 +38,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase): last_event = None # Make a real event chain - for i in range(event_count): + for _ in range(event_count): ev = self.create_and_send_event(room_id, user, False, last_event) last_event = [ev] # Sprinkle in some extremities - for i in range(extrems): + for _ in range(extrems): ev = self.create_and_send_event(room_id, user, False, last_event) # Let it run for a while, then pull out the statistics from the diff --git a/tests/unittest.py b/tests/unittest.py index d890ad981f..ee22a53849 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -133,7 +133,7 @@ class TestCase(unittest.TestCase): def assertObjectHasAttributes(self, attrs, obj): """Asserts that the given object has each of the attributes given, and that the value of each matches according to assertEquals.""" - for (key, value) in attrs.items(): + for key in attrs.keys(): if not hasattr(obj, key): raise AssertionError("Expected obj to have a '.%s'" % key) try: diff --git a/tests/utils.py b/tests/utils.py index af6b32fc66..63d52b9140 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -303,7 +303,7 @@ def setup_test_homeserver( # database for a few more seconds due to flakiness, preventing # us from dropping it when the test is over. If we can't drop # it, warn and move on. - for x in range(5): + for _ in range(5): try: cur.execute("DROP DATABASE IF EXISTS %s;" % (test_db,)) db_conn.commit() -- cgit 1.5.1 From 294c67503300b6bfa7785a5cfa55e25c1e452574 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 22 Apr 2021 16:43:50 +0100 Subject: Remove `synapse.types.Collection` (#9856) This is no longer required, since we have dropped support for Python 3.5. --- changelog.d/9856.misc | 1 + synapse/config/oidc.py | 4 ++-- synapse/events/spamcheck.py | 3 +-- synapse/federation/sender/__init__.py | 14 ++++++++++++-- synapse/handlers/appservice.py | 4 ++-- synapse/handlers/device.py | 3 +-- synapse/handlers/presence.py | 3 ++- synapse/handlers/sso.py | 3 ++- synapse/handlers/sync.py | 13 +++++++++++-- synapse/notifier.py | 9 ++------- synapse/replication/tcp/protocol.py | 3 +-- synapse/state/__init__.py | 3 ++- synapse/state/v2.py | 3 ++- synapse/storage/_base.py | 4 ++-- synapse/storage/database.py | 2 +- synapse/storage/databases/main/devices.py | 4 ++-- synapse/storage/databases/main/event_federation.py | 3 +-- synapse/storage/databases/main/events_worker.py | 13 +++++++++++-- synapse/storage/databases/main/roommember.py | 14 ++++++++++++-- synapse/storage/databases/main/search.py | 3 +-- synapse/storage/databases/main/stream.py | 4 ++-- synapse/storage/persist_events.py | 3 +-- synapse/storage/prepare_database.py | 3 +-- synapse/types.py | 14 -------------- synapse/util/caches/stream_change_cache.py | 3 +-- synapse/util/iterutils.py | 3 +-- 26 files changed, 77 insertions(+), 62 deletions(-) create mode 100644 changelog.d/9856.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9856.misc b/changelog.d/9856.misc new file mode 100644 index 0000000000..d67e8c386a --- /dev/null +++ b/changelog.d/9856.misc @@ -0,0 +1 @@ +Remove redundant `synapse.types.Collection` type definition. diff --git a/synapse/config/oidc.py b/synapse/config/oidc.py index 72402eb81d..ea0abf5aa2 100644 --- a/synapse/config/oidc.py +++ b/synapse/config/oidc.py @@ -14,14 +14,14 @@ # limitations under the License. from collections import Counter -from typing import Iterable, List, Mapping, Optional, Tuple, Type +from typing import Collection, Iterable, List, Mapping, Optional, Tuple, Type import attr from synapse.config._util import validate_config from synapse.config.sso import SsoAttributeRequirement from synapse.python_dependencies import DependencyException, check_requirements -from synapse.types import Collection, JsonDict +from synapse.types import JsonDict from synapse.util.module_loader import load_module from synapse.util.stringutils import parse_and_validate_mxc_uri diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py index c727b48c1e..7118d5f52d 100644 --- a/synapse/events/spamcheck.py +++ b/synapse/events/spamcheck.py @@ -15,12 +15,11 @@ import inspect import logging -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union from synapse.rest.media.v1._base import FileInfo from synapse.rest.media.v1.media_storage import ReadableFileWrapper from synapse.spam_checker_api import RegistrationBehaviour -from synapse.types import Collection from synapse.util.async_helpers import maybe_awaitable if TYPE_CHECKING: diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index b00a55324c..022bbf7dad 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,7 +14,17 @@ import abc import logging -from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + Hashable, + Iterable, + List, + Optional, + Set, + Tuple, +) from prometheus_client import Counter @@ -31,7 +41,7 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken from synapse.util.metrics import Measure if TYPE_CHECKING: diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index d7bc4e23ed..177310f0be 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, List, Optional, Union +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Union from prometheus_client import Counter @@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import ( wrap_as_background_process, ) from synapse.storage.databases.main.directory import RoomAliasMapping -from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID +from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID from synapse.util.metrics import Measure if TYPE_CHECKING: diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c1d7800981..34d39e3b44 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -14,7 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple from synapse.api import errors from synapse.api.constants import EventTypes @@ -28,7 +28,6 @@ from synapse.api.errors import ( from synapse.logging.opentracing import log_kv, set_tag, trace from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import ( - Collection, JsonDict, StreamToken, UserID, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 598466c9bd..7fd28ffa54 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -28,6 +28,7 @@ from bisect import bisect from contextlib import contextmanager from typing import ( TYPE_CHECKING, + Collection, Dict, FrozenSet, Iterable, @@ -59,7 +60,7 @@ from synapse.replication.tcp.commands import ClearUserSyncsCommand from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream from synapse.state import StateHandler from synapse.storage.databases.main import DataStore -from synapse.types import Collection, JsonDict, UserID, get_domain_from_id +from synapse.types import JsonDict, UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import _CacheContext, cached from synapse.util.metrics import Measure diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 8d00ffdc73..044ff06d84 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -18,6 +18,7 @@ from typing import ( Any, Awaitable, Callable, + Collection, Dict, Iterable, List, @@ -40,7 +41,7 @@ from synapse.handlers.ui_auth import UIAuthSessionDataConstants from synapse.http import get_request_user_agent from synapse.http.server import respond_with_html, respond_with_redirect from synapse.http.site import SynapseRequest -from synapse.types import Collection, JsonDict, UserID, contains_invalid_mxid_characters +from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters from synapse.util.async_helpers import Linearizer from synapse.util.stringutils import random_string diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index dc8ee8cd17..a9a3ee05c3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -14,7 +14,17 @@ # limitations under the License. import itertools import logging -from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Any, + Collection, + Dict, + FrozenSet, + List, + Optional, + Set, + Tuple, +) import attr from prometheus_client import Counter @@ -28,7 +38,6 @@ from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter from synapse.types import ( - Collection, JsonDict, MutableStateMap, Requester, diff --git a/synapse/notifier.py b/synapse/notifier.py index d5ab77058d..b9531007e2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -17,6 +17,7 @@ from collections import namedtuple from typing import ( Awaitable, Callable, + Collection, Dict, Iterable, List, @@ -42,13 +43,7 @@ from synapse.logging.opentracing import log_kv, start_active_span from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.streams.config import PaginationConfig -from synapse.types import ( - Collection, - PersistedEventPosition, - RoomStreamToken, - StreamToken, - UserID, -) +from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID from synapse.util.async_helpers import ObservableDeferred, timeout_deferred from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 6860576e78..6e3705364f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -49,7 +49,7 @@ import fcntl import logging import struct from inspect import isawaitable -from typing import TYPE_CHECKING, List, Optional +from typing import TYPE_CHECKING, Collection, List, Optional from prometheus_client import Counter from zope.interface import Interface, implementer @@ -76,7 +76,6 @@ from synapse.replication.tcp.commands import ( ServerCommand, parse_command_from_line, ) -from synapse.types import Collection from synapse.util import Clock from synapse.util.stringutils import random_string diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index c7ee731154..b3bd92d37c 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -19,6 +19,7 @@ from typing import ( Any, Awaitable, Callable, + Collection, DefaultDict, Dict, FrozenSet, @@ -46,7 +47,7 @@ from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.roommember import ProfileInfo -from synapse.types import Collection, StateMap +from synapse.types import StateMap from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.metrics import Measure, measure_func diff --git a/synapse/state/v2.py b/synapse/state/v2.py index 32671ddbde..008644cd98 100644 --- a/synapse/state/v2.py +++ b/synapse/state/v2.py @@ -18,6 +18,7 @@ import logging from typing import ( Any, Callable, + Collection, Dict, Generator, Iterable, @@ -37,7 +38,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import AuthError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase -from synapse.types import Collection, MutableStateMap, StateMap +from synapse.types import MutableStateMap, StateMap from synapse.util import Clock logger = logging.getLogger(__name__) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 56dd3a4861..d472676acf 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -16,13 +16,13 @@ import logging import random from abc import ABCMeta -from typing import TYPE_CHECKING, Any, Iterable, Optional, Union +from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union from synapse.storage.database import LoggingTransaction # noqa: F401 from synapse.storage.database import make_in_list_sql_clause # noqa: F401 from synapse.storage.database import DatabasePool from synapse.storage.types import Connection -from synapse.types import Collection, StreamToken, get_domain_from_id +from synapse.types import StreamToken, get_domain_from_id from synapse.util import json_decoder if TYPE_CHECKING: diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 9a6d2b21f9..9452368bf0 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -20,6 +20,7 @@ from time import monotonic as monotonic_time from typing import ( Any, Callable, + Collection, Dict, Iterable, Iterator, @@ -48,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.background_updates import BackgroundUpdater from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.types import Connection, Cursor -from synapse.types import Collection # python 3 does not have a maximum int value MAX_TXN_ID = 2 ** 63 - 1 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index b204875580..9be713399f 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -15,7 +15,7 @@ # limitations under the License. import abc import logging -from typing import Any, Dict, Iterable, List, Optional, Set, Tuple +from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple from synapse.api.errors import Codes, StoreError from synapse.logging.opentracing import ( @@ -31,7 +31,7 @@ from synapse.storage.database import ( LoggingTransaction, make_tuple_comparison_clause, ) -from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key +from synapse.types import JsonDict, get_verify_key_from_cross_signing_key from synapse.util import json_decoder, json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.lrucache import LruCache diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 32ce70a396..ff81d5cd17 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -14,7 +14,7 @@ import itertools import logging from queue import Empty, PriorityQueue -from typing import Dict, Iterable, List, Set, Tuple +from typing import Collection, Dict, Iterable, List, Set, Tuple from synapse.api.errors import StoreError from synapse.events import EventBase @@ -25,7 +25,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.databases.main.signatures import SignatureWorkerStore from synapse.storage.engines import PostgresEngine from synapse.storage.types import Cursor -from synapse.types import Collection from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 64d70785b8..2c823e09cf 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -15,7 +15,16 @@ import logging import threading from collections import namedtuple -from typing import Container, Dict, Iterable, List, Optional, Tuple, overload +from typing import ( + Collection, + Container, + Dict, + Iterable, + List, + Optional, + Tuple, + overload, +) from constantly import NamedConstant, Names from typing_extensions import Literal @@ -45,7 +54,7 @@ from synapse.storage.database import DatabasePool from synapse.storage.engines import PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.storage.util.sequence import build_sequence_generator -from synapse.types import Collection, JsonDict, get_domain_from_id +from synapse.types import JsonDict, get_domain_from_id from synapse.util.caches.descriptors import cached from synapse.util.caches.lrucache import LruCache from synapse.util.iterutils import batch_iter diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index fd525dce65..bd8513cd43 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -13,7 +13,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple +from typing import ( + TYPE_CHECKING, + Collection, + Dict, + FrozenSet, + Iterable, + List, + Optional, + Set, + Tuple, +) from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase @@ -33,7 +43,7 @@ from synapse.storage.roommember import ( ProfileInfo, RoomsForUser, ) -from synapse.types import Collection, PersistedEventPosition, get_domain_from_id +from synapse.types import PersistedEventPosition, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py index 0276f30656..6480d5a9f5 100644 --- a/synapse/storage/databases/main/search.py +++ b/synapse/storage/databases/main/search.py @@ -15,7 +15,7 @@ import logging import re from collections import namedtuple -from typing import List, Optional, Set +from typing import Collection, List, Optional, Set from synapse.api.errors import SynapseError from synapse.events import EventBase @@ -23,7 +23,6 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla from synapse.storage.database import DatabasePool from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.storage.engines import PostgresEngine, Sqlite3Engine -from synapse.types import Collection logger = logging.getLogger(__name__) diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index db5ce4ea01..7581c7d3ff 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -37,7 +37,7 @@ what sort order was used: import abc import logging from collections import namedtuple -from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple +from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple from twisted.internet import defer @@ -53,7 +53,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import Collection, PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py index 87e040b014..33dc752d8f 100644 --- a/synapse/storage/persist_events.py +++ b/synapse/storage/persist_events.py @@ -17,7 +17,7 @@ import itertools import logging from collections import deque, namedtuple -from typing import Dict, Iterable, List, Optional, Set, Tuple +from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter, Histogram @@ -32,7 +32,6 @@ from synapse.storage.databases import Databases from synapse.storage.databases.main.events import DeltaState from synapse.storage.databases.main.events_worker import EventRedactBehaviour from synapse.types import ( - Collection, PersistedEventPosition, RoomStreamToken, StateMap, diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 05a9355974..7a2cbee426 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -17,7 +17,7 @@ import logging import os import re from collections import Counter -from typing import Generator, Iterable, List, Optional, TextIO, Tuple +from typing import Collection, Generator, Iterable, List, Optional, TextIO, Tuple import attr from typing_extensions import Counter as CounterType @@ -27,7 +27,6 @@ from synapse.storage.database import LoggingDatabaseConnection from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.engines.postgres import PostgresEngine from synapse.storage.types import Cursor -from synapse.types import Collection logger = logging.getLogger(__name__) diff --git a/synapse/types.py b/synapse/types.py index 21654ae686..e19f28d543 100644 --- a/synapse/types.py +++ b/synapse/types.py @@ -15,13 +15,11 @@ import abc import re import string -import sys from collections import namedtuple from typing import ( TYPE_CHECKING, Any, Dict, - Iterable, Mapping, MutableMapping, Optional, @@ -50,18 +48,6 @@ if TYPE_CHECKING: from synapse.appservice.api import ApplicationService from synapse.storage.databases.main import DataStore -# define a version of typing.Collection that works on python 3.5 -if sys.version_info[:3] >= (3, 6, 0): - from typing import Collection -else: - from typing import Container, Sized - - T_co = TypeVar("T_co", covariant=True) - - class Collection(Iterable[T_co], Container[T_co], Sized): # type: ignore - __slots__ = () - - # Define a state map type from type/state_key to T (usually an event ID or # event) T = TypeVar("T") diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 0469e7d120..e81e468899 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -14,11 +14,10 @@ import logging import math -from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union +from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union from sortedcontainers import SortedDict -from synapse.types import Collection from synapse.util import caches logger = logging.getLogger(__name__) diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 6f73b1d56d..abfdc29832 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -15,6 +15,7 @@ import heapq from itertools import islice from typing import ( + Collection, Dict, Generator, Iterable, @@ -26,8 +27,6 @@ from typing import ( TypeVar, ) -from synapse.types import Collection - T = TypeVar("T") -- cgit 1.5.1 From 69018acbd2d1f331d6a52335b4938c3753b16de6 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 22 Apr 2021 16:53:24 +0100 Subject: Clear the resync bit after resyncing device lists (#9867) Fixes #9866. --- changelog.d/9867.bugfix | 1 + synapse/handlers/device.py | 7 +++++++ synapse/storage/databases/main/devices.py | 19 +++++++++---------- 3 files changed, 17 insertions(+), 10 deletions(-) create mode 100644 changelog.d/9867.bugfix (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9867.bugfix b/changelog.d/9867.bugfix new file mode 100644 index 0000000000..f236de247d --- /dev/null +++ b/changelog.d/9867.bugfix @@ -0,0 +1 @@ +Fix a bug which could cause Synapse to get stuck in a loop of resyncing device lists. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 34d39e3b44..95bdc5902a 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -925,6 +925,10 @@ class DeviceListUpdater: else: cached_devices = await self.store.get_cached_devices_for_user(user_id) if cached_devices == {d["device_id"]: d for d in devices}: + logging.info( + "Skipping device list resync for %s, as our cache matches already", + user_id, + ) devices = [] ignore_devices = True @@ -940,6 +944,9 @@ class DeviceListUpdater: await self.store.update_remote_device_list_cache( user_id, devices, stream_id ) + # mark the cache as valid, whether or not we actually processed any device + # list updates. + await self.store.mark_remote_user_device_cache_as_valid(user_id) device_ids = [device["device_id"] for device in devices] # Handle cross-signing keys. diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 9be713399f..c9346de316 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -717,7 +717,15 @@ class DeviceWorkerStore(SQLBaseStore): keyvalues={"user_id": user_id}, values={}, insertion_values={"added_ts": self._clock.time_msec()}, - desc="make_remote_user_device_cache_as_stale", + desc="mark_remote_user_device_cache_as_stale", + ) + + async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None: + # Remove the database entry that says we need to resync devices, after a resync + await self.db_pool.simple_delete( + table="device_lists_remote_resync", + keyvalues={"user_id": user_id}, + desc="mark_remote_user_device_cache_as_valid", ) async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None: @@ -1289,15 +1297,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): lock=False, ) - # If we're replacing the remote user's device list cache presumably - # we've done a full resync, so we remove the entry that says we need - # to resync - self.db_pool.simple_delete_txn( - txn, - table="device_lists_remote_resync", - keyvalues={"user_id": user_id}, - ) - async def add_device_change_to_streams( self, user_id: str, device_ids: Collection[str], hosts: List[str] ): -- cgit 1.5.1 From 3853a7edfcee1c00ba4df04b06821397e1155257 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Apr 2021 11:47:07 +0100 Subject: Only store data in caches, not "smart" objects (#9845) --- changelog.d/9845.misc | 1 + synapse/push/bulk_push_rule_evaluator.py | 161 +++++++++++++++------------ synapse/storage/databases/main/roommember.py | 161 +++++++++++++++------------ 3 files changed, 182 insertions(+), 141 deletions(-) create mode 100644 changelog.d/9845.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9845.misc b/changelog.d/9845.misc new file mode 100644 index 0000000000..875dd6d131 --- /dev/null +++ b/changelog.d/9845.misc @@ -0,0 +1 @@ +Only store the raw data in the in-memory caches, rather than objects that include references to e.g. the data stores. diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 50b470c310..350646f458 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -106,6 +106,10 @@ class BulkPushRuleEvaluator: self.store = hs.get_datastore() self.auth = hs.get_auth() + # Used by `RulesForRoom` to ensure only one thing mutates the cache at a + # time. Keyed off room_id. + self._rules_linearizer = Linearizer(name="rules_for_room") + self.room_push_rule_cache_metrics = register_cache( "cache", "room_push_rule_cache", @@ -123,7 +127,16 @@ class BulkPushRuleEvaluator: dict of user_id -> push_rules """ room_id = event.room_id - rules_for_room = self._get_rules_for_room(room_id) + + rules_for_room_data = self._get_rules_for_room(room_id) + rules_for_room = RulesForRoom( + hs=self.hs, + room_id=room_id, + rules_for_room_cache=self._get_rules_for_room.cache, + room_push_rule_cache_metrics=self.room_push_rule_cache_metrics, + linearizer=self._rules_linearizer, + cached_data=rules_for_room_data, + ) rules_by_user = await rules_for_room.get_rules(event, context) @@ -142,17 +155,12 @@ class BulkPushRuleEvaluator: return rules_by_user @lru_cache() - def _get_rules_for_room(self, room_id: str) -> "RulesForRoom": - """Get the current RulesForRoom object for the given room id""" - # It's important that RulesForRoom gets added to self._get_rules_for_room.cache + def _get_rules_for_room(self, room_id: str) -> "RulesForRoomData": + """Get the current RulesForRoomData object for the given room id""" + # It's important that the RulesForRoomData object gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) - return RulesForRoom( - self.hs, - room_id, - self._get_rules_for_room.cache, - self.room_push_rule_cache_metrics, - ) + return RulesForRoomData() async def _get_power_levels_and_sender_level( self, event: EventBase, context: EventContext @@ -282,11 +290,49 @@ def _condition_checker( return True +@attr.s(slots=True) +class RulesForRoomData: + """The data stored in the cache by `RulesForRoom`. + + We don't store `RulesForRoom` directly in the cache as we want our caches to + *only* include data, and not references to e.g. the data stores. + """ + + # event_id -> (user_id, state) + member_map = attr.ib(type=Dict[str, Tuple[str, str]], factory=dict) + # user_id -> rules + rules_by_user = attr.ib(type=Dict[str, List[Dict[str, dict]]], factory=dict) + + # The last state group we updated the caches for. If the state_group of + # a new event comes along, we know that we can just return the cached + # result. + # On invalidation of the rules themselves (if the user changes them), + # we invalidate everything and set state_group to `object()` + state_group = attr.ib(type=Union[object, int], factory=object) + + # A sequence number to keep track of when we're allowed to update the + # cache. We bump the sequence number when we invalidate the cache. If + # the sequence number changes while we're calculating stuff we should + # not update the cache with it. + sequence = attr.ib(type=int, default=0) + + # A cache of user_ids that we *know* aren't interesting, e.g. user_ids + # owned by AS's, or remote users, etc. (I.e. users we will never need to + # calculate push for) + # These never need to be invalidated as we will never set up push for + # them. + uninteresting_user_set = attr.ib(type=Set[str], factory=set) + + class RulesForRoom: """Caches push rules for users in a room. This efficiently handles users joining/leaving the room by not invalidating the entire cache for the room. + + A new instance is constructed for each call to + `BulkPushRuleEvaluator._get_rules_for_event`, with the cached data from + previous calls passed in. """ def __init__( @@ -295,6 +341,8 @@ class RulesForRoom: room_id: str, rules_for_room_cache: LruCache, room_push_rule_cache_metrics: CacheMetric, + linearizer: Linearizer, + cached_data: RulesForRoomData, ): """ Args: @@ -303,38 +351,21 @@ class RulesForRoom: rules_for_room_cache: The cache object that caches these RoomsForUser objects. room_push_rule_cache_metrics: The metrics object + linearizer: The linearizer used to ensure only one thing mutates + the cache at a time. Keyed off room_id + cached_data: Cached data from previous calls to `self.get_rules`, + can be mutated. """ self.room_id = room_id self.is_mine_id = hs.is_mine_id self.store = hs.get_datastore() self.room_push_rule_cache_metrics = room_push_rule_cache_metrics - self.linearizer = Linearizer(name="rules_for_room") - - # event_id -> (user_id, state) - self.member_map = {} # type: Dict[str, Tuple[str, str]] - # user_id -> rules - self.rules_by_user = {} # type: Dict[str, List[Dict[str, dict]]] - - # The last state group we updated the caches for. If the state_group of - # a new event comes along, we know that we can just return the cached - # result. - # On invalidation of the rules themselves (if the user changes them), - # we invalidate everything and set state_group to `object()` - self.state_group = object() - - # A sequence number to keep track of when we're allowed to update the - # cache. We bump the sequence number when we invalidate the cache. If - # the sequence number changes while we're calculating stuff we should - # not update the cache with it. - self.sequence = 0 - - # A cache of user_ids that we *know* aren't interesting, e.g. user_ids - # owned by AS's, or remote users, etc. (I.e. users we will never need to - # calculate push for) - # These never need to be invalidated as we will never set up push for - # them. - self.uninteresting_user_set = set() # type: Set[str] + # Used to ensure only one thing mutates the cache at a time. Keyed off + # room_id. + self.linearizer = linearizer + + self.data = cached_data # We need to be clever on the invalidating caches callbacks, as # otherwise the invalidation callback holds a reference to the object, @@ -352,25 +383,25 @@ class RulesForRoom: """ state_group = context.state_group - if state_group and self.state_group == state_group: + if state_group and self.data.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) self.room_push_rule_cache_metrics.inc_hits() - return self.rules_by_user + return self.data.rules_by_user - with (await self.linearizer.queue(())): - if state_group and self.state_group == state_group: + with (await self.linearizer.queue(self.room_id)): + if state_group and self.data.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) self.room_push_rule_cache_metrics.inc_hits() - return self.rules_by_user + return self.data.rules_by_user self.room_push_rule_cache_metrics.inc_misses() ret_rules_by_user = {} missing_member_event_ids = {} - if state_group and self.state_group == context.prev_group: + if state_group and self.data.state_group == context.prev_group: # If we have a simple delta then we can reuse most of the previous # results. - ret_rules_by_user = self.rules_by_user + ret_rules_by_user = self.data.rules_by_user current_state_ids = context.delta_ids push_rules_delta_state_cache_metric.inc_hits() @@ -393,24 +424,24 @@ class RulesForRoom: if typ != EventTypes.Member: continue - if user_id in self.uninteresting_user_set: + if user_id in self.data.uninteresting_user_set: continue if not self.is_mine_id(user_id): - self.uninteresting_user_set.add(user_id) + self.data.uninteresting_user_set.add(user_id) continue if self.store.get_if_app_services_interested_in_user(user_id): - self.uninteresting_user_set.add(user_id) + self.data.uninteresting_user_set.add(user_id) continue event_id = current_state_ids[key] - res = self.member_map.get(event_id, None) + res = self.data.member_map.get(event_id, None) if res: user_id, state = res if state == Membership.JOIN: - rules = self.rules_by_user.get(user_id, None) + rules = self.data.rules_by_user.get(user_id, None) if rules: ret_rules_by_user[user_id] = rules continue @@ -430,7 +461,7 @@ class RulesForRoom: else: # The push rules didn't change but lets update the cache anyway self.update_cache( - self.sequence, + self.data.sequence, members={}, # There were no membership changes rules_by_user=ret_rules_by_user, state_group=state_group, @@ -461,7 +492,7 @@ class RulesForRoom: for. Used when updating the cache. event: The event we are currently computing push rules for. """ - sequence = self.sequence + sequence = self.data.sequence rows = await self.store.get_membership_from_event_ids(member_event_ids.values()) @@ -501,23 +532,11 @@ class RulesForRoom: self.update_cache(sequence, members, ret_rules_by_user, state_group) - def invalidate_all(self) -> None: - # Note: Don't hand this function directly to an invalidation callback - # as it keeps a reference to self and will stop this instance from being - # GC'd if it gets dropped from the rules_to_user cache. Instead use - # `self.invalidate_all_cb` - logger.debug("Invalidating RulesForRoom for %r", self.room_id) - self.sequence += 1 - self.state_group = object() - self.member_map = {} - self.rules_by_user = {} - push_rules_invalidation_counter.inc() - def update_cache(self, sequence, members, rules_by_user, state_group) -> None: - if sequence == self.sequence: - self.member_map.update(members) - self.rules_by_user = rules_by_user - self.state_group = state_group + if sequence == self.data.sequence: + self.data.member_map.update(members) + self.data.rules_by_user = rules_by_user + self.data.state_group = state_group @attr.attrs(slots=True, frozen=True) @@ -535,6 +554,10 @@ class _Invalidation: room_id = attr.ib(type=str) def __call__(self) -> None: - rules = self.cache.get(self.room_id, None, update_metrics=False) - if rules: - rules.invalidate_all() + rules_data = self.cache.get(self.room_id, None, update_metrics=False) + if rules_data: + rules_data.sequence += 1 + rules_data.state_group = object() + rules_data.member_map = {} + rules_data.rules_by_user = {} + push_rules_invalidation_counter.inc() diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py index bd8513cd43..2a8532f8c1 100644 --- a/synapse/storage/databases/main/roommember.py +++ b/synapse/storage/databases/main/roommember.py @@ -23,8 +23,11 @@ from typing import ( Optional, Set, Tuple, + Union, ) +import attr + from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext @@ -43,7 +46,7 @@ from synapse.storage.roommember import ( ProfileInfo, RoomsForUser, ) -from synapse.types import PersistedEventPosition, get_domain_from_id +from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches import intern_string from synapse.util.caches.descriptors import _CacheContext, cached, cachedList @@ -63,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) + # Used by `_get_joined_hosts` to ensure only one thing mutates the cache + # at a time. Keyed by room_id. + self._joined_host_linearizer = Linearizer("_JoinedHostsCache") + # Is the current_state_events.membership up to date? Or is the # background update still running? self._current_state_events_membership_up_to_date = False @@ -740,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore): @cached(num_args=2, max_entries=10000, iterable=True) async def _get_joined_hosts( - self, room_id, state_group, current_state_ids, state_entry - ): - # We don't use `state_group`, its there so that we can cache based - # on it. However, its important that its never None, since two current_state's - # with a state_group of None are likely to be different. + self, + room_id: str, + state_group: int, + current_state_ids: StateMap[str], + state_entry: "_StateCacheEntry", + ) -> FrozenSet[str]: + # We don't use `state_group`, its there so that we can cache based on + # it. However, its important that its never None, since two + # current_state's with a state_group of None are likely to be different. + # + # The `state_group` must match the `state_entry.state_group` (if not None). assert state_group is not None - + assert state_entry.state_group is None or state_entry.state_group == state_group + + # We use a secondary cache of previous work to allow us to build up the + # joined hosts for the given state group based on previous state groups. + # + # We cache one object per room containing the results of the last state + # group we got joined hosts for. The idea is that generally + # `get_joined_hosts` is called with the "current" state group for the + # room, and so consecutive calls will be for consecutive state groups + # which point to the previous state group. cache = await self._get_joined_hosts_cache(room_id) - return await cache.get_destinations(state_entry) + + # If the state group in the cache matches, we already have the data we need. + if state_entry.state_group == cache.state_group: + return frozenset(cache.hosts_to_joined_users) + + # Since we'll mutate the cache we need to lock. + with (await self._joined_host_linearizer.queue(room_id)): + if state_entry.state_group == cache.state_group: + # Same state group, so nothing to do. We've already checked for + # this above, but the cache may have changed while waiting on + # the lock. + pass + elif state_entry.prev_group == cache.state_group: + # The cached work is for the previous state group, so we work out + # the delta. + for (typ, state_key), event_id in state_entry.delta_ids.items(): + if typ != EventTypes.Member: + continue + + host = intern_string(get_domain_from_id(state_key)) + user_id = state_key + known_joins = cache.hosts_to_joined_users.setdefault(host, set()) + + event = await self.get_event(event_id) + if event.membership == Membership.JOIN: + known_joins.add(user_id) + else: + known_joins.discard(user_id) + + if not known_joins: + cache.hosts_to_joined_users.pop(host, None) + else: + # The cache doesn't match the state group or prev state group, + # so we calculate the result from first principles. + joined_users = await self.get_joined_users_from_state( + room_id, state_entry + ) + + cache.hosts_to_joined_users = {} + for user_id in joined_users: + host = intern_string(get_domain_from_id(user_id)) + cache.hosts_to_joined_users.setdefault(host, set()).add(user_id) + + if state_entry.state_group: + cache.state_group = state_entry.state_group + else: + cache.state_group = object() + + return frozenset(cache.hosts_to_joined_users) @cached(max_entries=10000) def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache": - return _JoinedHostsCache(self, room_id) + return _JoinedHostsCache() @cached(num_args=2) async def did_forget(self, user_id: str, room_id: str) -> bool: @@ -1062,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore): await self.db_pool.runInteraction("forget_membership", f) +@attr.s(slots=True) class _JoinedHostsCache: - """Cache for joined hosts in a room that is optimised to handle updates - via state deltas. - """ - - def __init__(self, store, room_id): - self.store = store - self.room_id = room_id + """The cached data used by the `_get_joined_hosts_cache`.""" - self.hosts_to_joined_users = {} + # Dict of host to the set of their users in the room at the state group. + hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict) - self.state_group = object() - - self.linearizer = Linearizer("_JoinedHostsCache") - - self._len = 0 - - async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]: - """Get set of destinations for a state entry - - Args: - state_entry - - Returns: - The destinations as a set. - """ - if state_entry.state_group == self.state_group: - return frozenset(self.hosts_to_joined_users) - - with (await self.linearizer.queue(())): - if state_entry.state_group == self.state_group: - pass - elif state_entry.prev_group == self.state_group: - for (typ, state_key), event_id in state_entry.delta_ids.items(): - if typ != EventTypes.Member: - continue - - host = intern_string(get_domain_from_id(state_key)) - user_id = state_key - known_joins = self.hosts_to_joined_users.setdefault(host, set()) - - event = await self.store.get_event(event_id) - if event.membership == Membership.JOIN: - known_joins.add(user_id) - else: - known_joins.discard(user_id) - - if not known_joins: - self.hosts_to_joined_users.pop(host, None) - else: - joined_users = await self.store.get_joined_users_from_state( - self.room_id, state_entry - ) - - self.hosts_to_joined_users = {} - for user_id in joined_users: - host = intern_string(get_domain_from_id(user_id)) - self.hosts_to_joined_users.setdefault(host, set()).add(user_id) - - if state_entry.state_group: - self.state_group = state_entry.state_group - else: - self.state_group = object() - self._len = sum(len(v) for v in self.hosts_to_joined_users.values()) - return frozenset(self.hosts_to_joined_users) + # The state group `hosts_to_joined_users` is derived from. Will be an object + # if the instance is newly created or if the state is not based on a state + # group. (An object is used as a sentinel value to ensure that it never is + # equal to anything else). + state_group = attr.ib(type=Union[object, int], factory=object) def __len__(self): - return self._len + return sum(len(v) for v in self.hosts_to_joined_users.values()) -- cgit 1.5.1 From 9d25a0ae65ce8728d0fda1eebaf0b469316f84d7 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 23 Apr 2021 12:21:55 +0100 Subject: Split presence out of master (#9820) --- changelog.d/9820.feature | 1 + scripts/synapse_port_db | 7 +- synapse/app/generic_worker.py | 31 +------- synapse/config/workers.py | 27 ++++++- synapse/handlers/presence.py | 56 ++++++++----- synapse/replication/http/_base.py | 5 +- synapse/replication/slave/storage/presence.py | 50 ------------ synapse/replication/tcp/handler.py | 18 ++++- synapse/replication/tcp/streams/_base.py | 17 ++-- synapse/rest/client/v1/presence.py | 7 +- synapse/server.py | 6 +- synapse/storage/databases/main/__init__.py | 47 +---------- synapse/storage/databases/main/presence.py | 92 +++++++++++++++++++++- .../schema/delta/59/12presence_stream_instance.sql | 18 +++++ .../59/12presence_stream_instance_seq.sql.postgres | 20 +++++ tests/app/test_frontend_proxy.py | 83 ------------------- tests/rest/client/v1/test_presence.py | 5 +- 17 files changed, 245 insertions(+), 245 deletions(-) create mode 100644 changelog.d/9820.feature delete mode 100644 synapse/replication/slave/storage/presence.py create mode 100644 synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql create mode 100644 synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres delete mode 100644 tests/app/test_frontend_proxy.py (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9820.feature b/changelog.d/9820.feature new file mode 100644 index 0000000000..f56b0bb3bd --- /dev/null +++ b/changelog.d/9820.feature @@ -0,0 +1 @@ +Add experimental support for handling presence on a worker. diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index b7c1ffc956..f0c93d5226 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -634,8 +634,11 @@ class Porter(object): "device_inbox_sequence", ("device_inbox", "device_federation_outbox") ) await self._setup_sequence( - "account_data_sequence", ("room_account_data", "room_tags_revisions", "account_data")) - await self._setup_sequence("receipts_sequence", ("receipts_linearized", )) + "account_data_sequence", + ("room_account_data", "room_tags_revisions", "account_data"), + ) + await self._setup_sequence("receipts_sequence", ("receipts_linearized",)) + await self._setup_sequence("presence_stream_sequence", ("presence_stream",)) await self._setup_auth_chain_sequence() # Step 3. Get tables. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index 26c458dbb6..7b2ac3ca64 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -55,7 +55,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore from synapse.replication.slave.storage.filtering import SlavedFilteringStore from synapse.replication.slave.storage.groups import SlavedGroupServerStore from synapse.replication.slave.storage.keys import SlavedKeyStore -from synapse.replication.slave.storage.presence import SlavedPresenceStore from synapse.replication.slave.storage.profile import SlavedProfileStore from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore from synapse.replication.slave.storage.pushers import SlavedPusherStore @@ -64,7 +63,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto from synapse.replication.slave.storage.room import RoomStore from synapse.replication.slave.storage.transactions import SlavedTransactionStore from synapse.rest.admin import register_servlets_for_media_repo -from synapse.rest.client.v1 import events, login, room +from synapse.rest.client.v1 import events, login, presence, room from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet from synapse.rest.client.v1.profile import ( ProfileAvatarURLRestServlet, @@ -110,6 +109,7 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore from synapse.storage.databases.main.monthly_active_users import ( MonthlyActiveUsersWorkerStore, ) +from synapse.storage.databases.main.presence import PresenceStore from synapse.storage.databases.main.search import SearchWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.transactions import TransactionWorkerStore @@ -121,26 +121,6 @@ from synapse.util.versionstring import get_version_string logger = logging.getLogger("synapse.app.generic_worker") -class PresenceStatusStubServlet(RestServlet): - """If presence is disabled this servlet can be used to stub out setting - presence status. - """ - - PATTERNS = client_patterns("/presence/(?P[^/]*)/status") - - def __init__(self, hs): - super().__init__() - self.auth = hs.get_auth() - - async def on_GET(self, request, user_id): - await self.auth.get_user_by_req(request) - return 200, {"presence": "offline"} - - async def on_PUT(self, request, user_id): - await self.auth.get_user_by_req(request) - return 200, {} - - class KeyUploadServlet(RestServlet): """An implementation of the `KeyUploadServlet` that responds to read only requests, but otherwise proxies through to the master instance. @@ -241,6 +221,7 @@ class GenericWorkerSlavedStore( StatsStore, UIAuthWorkerStore, EndToEndRoomKeyStore, + PresenceStore, SlavedDeviceInboxStore, SlavedDeviceStore, SlavedReceiptsStore, @@ -259,7 +240,6 @@ class GenericWorkerSlavedStore( SlavedTransactionStore, SlavedProfileStore, SlavedClientIpStore, - SlavedPresenceStore, SlavedFilteringStore, MonthlyActiveUsersWorkerStore, MediaRepositoryStore, @@ -327,10 +307,7 @@ class GenericWorkerServer(HomeServer): user_directory.register_servlets(self, resource) - # If presence is disabled, use the stub servlet that does - # not allow sending presence - if not self.config.use_presence: - PresenceStatusStubServlet(self).register(resource) + presence.register_servlets(self, resource) groups.register_servlets(self, resource) diff --git a/synapse/config/workers.py b/synapse/config/workers.py index b2540163d1..462630201d 100644 --- a/synapse/config/workers.py +++ b/synapse/config/workers.py @@ -64,6 +64,14 @@ class WriterLocations: Attributes: events: The instances that write to the event and backfill streams. typing: The instance that writes to the typing stream. + to_device: The instances that write to the to_device stream. Currently + can only be a single instance. + account_data: The instances that write to the account data streams. Currently + can only be a single instance. + receipts: The instances that write to the receipts stream. Currently + can only be a single instance. + presence: The instances that write to the presence stream. Currently + can only be a single instance. """ events = attr.ib( @@ -85,6 +93,11 @@ class WriterLocations: type=List[str], converter=_instance_to_list_converter, ) + presence = attr.ib( + default=["master"], + type=List[str], + converter=_instance_to_list_converter, + ) class WorkerConfig(Config): @@ -188,7 +201,14 @@ class WorkerConfig(Config): # Check that the configured writers for events and typing also appears in # `instance_map`. - for stream in ("events", "typing", "to_device", "account_data", "receipts"): + for stream in ( + "events", + "typing", + "to_device", + "account_data", + "receipts", + "presence", + ): instances = _instance_to_list_converter(getattr(self.writers, stream)) for instance in instances: if instance != "master" and instance not in self.instance_map: @@ -215,6 +235,11 @@ class WorkerConfig(Config): if len(self.writers.events) == 0: raise ConfigError("Must specify at least one instance to handle `events`.") + if len(self.writers.presence) != 1: + raise ConfigError( + "Must only specify one instance to handle `presence` messages." + ) + self.events_shard_config = RoutableShardedWorkerHandlingConfig( self.writers.events ) diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 7fd28ffa54..9938be3821 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -122,7 +122,8 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER class BasePresenceHandler(abc.ABC): - """Parts of the PresenceHandler that are shared between workers and master""" + """Parts of the PresenceHandler that are shared between workers and presence + writer""" def __init__(self, hs: "HomeServer"): self.clock = hs.get_clock() @@ -309,8 +310,16 @@ class WorkerPresenceHandler(BasePresenceHandler): super().__init__(hs) self.hs = hs + self._presence_writer_instance = hs.config.worker.writers.presence[0] + self._presence_enabled = hs.config.use_presence + # Route presence EDUs to the right worker + hs.get_federation_registry().register_instances_for_edu( + "m.presence", + hs.config.worker.writers.presence, + ) + # The number of ongoing syncs on this process, by user id. # Empty if _presence_enabled is false. self._user_to_num_current_syncs = {} # type: Dict[str, int] @@ -318,8 +327,8 @@ class WorkerPresenceHandler(BasePresenceHandler): self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - # user_id -> last_sync_ms. Lists the users that have stopped syncing - # but we haven't notified the master of that yet + # user_id -> last_sync_ms. Lists the users that have stopped syncing but + # we haven't notified the presence writer of that yet self.users_going_offline = {} self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) @@ -352,22 +361,23 @@ class WorkerPresenceHandler(BasePresenceHandler): ) def mark_as_coming_online(self, user_id): - """A user has started syncing. Send a UserSync to the master, unless they - had recently stopped syncing. + """A user has started syncing. Send a UserSync to the presence writer, + unless they had recently stopped syncing. Args: user_id (str) """ going_offline = self.users_going_offline.pop(user_id, None) if not going_offline: - # Safe to skip because we haven't yet told the master they were offline + # Safe to skip because we haven't yet told the presence writer they + # were offline self.send_user_sync(user_id, True, self.clock.time_msec()) def mark_as_going_offline(self, user_id): - """A user has stopped syncing. We wait before notifying the master as - its likely they'll come back soon. This allows us to avoid sending - a stopped syncing immediately followed by a started syncing notification - to the master + """A user has stopped syncing. We wait before notifying the presence + writer as its likely they'll come back soon. This allows us to avoid + sending a stopped syncing immediately followed by a started syncing + notification to the presence writer Args: user_id (str) @@ -375,8 +385,8 @@ class WorkerPresenceHandler(BasePresenceHandler): self.users_going_offline[user_id] = self.clock.time_msec() def send_stop_syncing(self): - """Check if there are any users who have stopped syncing a while ago - and haven't come back yet. If there are poke the master about them. + """Check if there are any users who have stopped syncing a while ago and + haven't come back yet. If there are poke the presence writer about them. """ now = self.clock.time_msec() for user_id, last_sync_ms in list(self.users_going_offline.items()): @@ -492,9 +502,12 @@ class WorkerPresenceHandler(BasePresenceHandler): if not self.hs.config.use_presence: return - # Proxy request to master + # Proxy request to instance that writes presence await self._set_state_client( - user_id=user_id, state=state, ignore_status_msg=ignore_status_msg + instance_name=self._presence_writer_instance, + user_id=user_id, + state=state, + ignore_status_msg=ignore_status_msg, ) async def bump_presence_active_time(self, user): @@ -505,9 +518,11 @@ class WorkerPresenceHandler(BasePresenceHandler): if not self.hs.config.use_presence: return - # Proxy request to master + # Proxy request to instance that writes presence user_id = user.to_string() - await self._bump_active_client(user_id=user_id) + await self._bump_active_client( + instance_name=self._presence_writer_instance, user_id=user_id + ) class PresenceHandler(BasePresenceHandler): @@ -1909,7 +1924,7 @@ class PresenceFederationQueue: self._queue_presence_updates = True # Whether this instance is a presence writer. - self._presence_writer = hs.config.worker.worker_app is None + self._presence_writer = self._instance_name in hs.config.worker.writers.presence # The FederationSender instance, if this process sends federation traffic directly. self._federation = None @@ -1957,7 +1972,7 @@ class PresenceFederationQueue: Will forward to the local federation sender (if there is one) and queue to send over replication (if there are other federation sender instances.). - Must only be called on the master process. + Must only be called on the presence writer process. """ # This should only be called on a presence writer. @@ -2003,10 +2018,11 @@ class PresenceFederationQueue: We return rows in the form of `(destination, user_id)` to keep the size of each row bounded (rather than returning the sets in a row). - On workers this will query the master process via HTTP replication. + On workers this will query the presence writer process via HTTP replication. """ if instance_name != self._instance_name: - # If not local we query over http replication from the master + # If not local we query over http replication from the presence + # writer result = await self._repl_client( instance_name=instance_name, stream_name=PresenceFederationStream.NAME, diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py index ece03467b5..5685cf2121 100644 --- a/synapse/replication/http/_base.py +++ b/synapse/replication/http/_base.py @@ -158,7 +158,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta): def make_client(cls, hs): """Create a client that makes requests. - Returns a callable that accepts the same parameters as `_serialize_payload`. + Returns a callable that accepts the same parameters as + `_serialize_payload`, and also accepts an optional `instance_name` + parameter to specify which instance to hit (the instance must be in + the `instance_map` config). """ clock = hs.get_clock() client = hs.get_simple_http_client() diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py deleted file mode 100644 index 57327d910d..0000000000 --- a/synapse/replication/slave/storage/presence.py +++ /dev/null @@ -1,50 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.replication.tcp.streams import PresenceStream -from synapse.storage import DataStore -from synapse.storage.database import DatabasePool -from synapse.storage.databases.main.presence import PresenceStore -from synapse.util.caches.stream_change_cache import StreamChangeCache - -from ._base import BaseSlavedStore -from ._slaved_id_tracker import SlavedIdTracker - - -class SlavedPresenceStore(BaseSlavedStore): - def __init__(self, database: DatabasePool, db_conn, hs): - super().__init__(database, db_conn, hs) - self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id") - - self._presence_on_startup = self._get_active_presence(db_conn) # type: ignore - - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", self._presence_id_gen.get_current_token() - ) - - _get_active_presence = DataStore._get_active_presence - take_presence_startup_info = DataStore.take_presence_startup_info - _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"] - get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"] - - def get_current_presence_token(self): - return self._presence_id_gen.get_current_token() - - def process_replication_rows(self, stream_name, instance_name, token, rows): - if stream_name == PresenceStream.NAME: - self._presence_id_gen.advance(instance_name, token) - for row in rows: - self.presence_stream_cache.entity_has_changed(row.user_id, token) - self._get_presence_for_user.invalidate((row.user_id,)) - return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 2ce1b9f222..7ced4c543c 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -55,6 +55,8 @@ from synapse.replication.tcp.streams import ( CachesStream, EventsStream, FederationStream, + PresenceFederationStream, + PresenceStream, ReceiptsStream, Stream, TagAccountDataStream, @@ -99,6 +101,10 @@ class ReplicationCommandHandler: self._instance_id = hs.get_instance_id() self._instance_name = hs.get_instance_name() + self._is_presence_writer = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + self._streams = { stream.NAME: stream(hs) for stream in STREAMS_MAP.values() } # type: Dict[str, Stream] @@ -153,6 +159,14 @@ class ReplicationCommandHandler: continue + if isinstance(stream, (PresenceStream, PresenceFederationStream)): + # Only add PresenceStream as a source on the instance in charge + # of presence. + if self._is_presence_writer: + self._streams_to_replicate.append(stream) + + continue + # Only add any other streams if we're on master. if hs.config.worker_app is not None: continue @@ -350,7 +364,7 @@ class ReplicationCommandHandler: ) -> Optional[Awaitable[None]]: user_sync_counter.inc() - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_row( cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms ) @@ -360,7 +374,7 @@ class ReplicationCommandHandler: def on_CLEAR_USER_SYNC( self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand ) -> Optional[Awaitable[None]]: - if self._is_master: + if self._is_presence_writer: return self._presence_handler.update_external_syncs_clear(cmd.instance_id) else: return None diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 9d75a89f1c..b03824925a 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -272,15 +272,22 @@ class PresenceStream(Stream): NAME = "presence" ROW_TYPE = PresenceStreamRow - def __init__(self, hs): + def __init__(self, hs: "HomeServer"): store = hs.get_datastore() - if hs.config.worker_app is None: - # on the master, query the presence handler + if hs.get_instance_name() in hs.config.worker.writers.presence: + # on the presence writer, query the presence handler presence_handler = hs.get_presence_handler() - update_function = presence_handler.get_all_presence_updates + + from synapse.handlers.presence import PresenceHandler + + assert isinstance(presence_handler, PresenceHandler) + + update_function = ( + presence_handler.get_all_presence_updates + ) # type: UpdateFunction else: - # Query master process + # Query presence writer process update_function = make_http_update_function(hs, self.NAME) super().__init__( diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py index c232484f29..2b24fe5aa6 100644 --- a/synapse/rest/client/v1/presence.py +++ b/synapse/rest/client/v1/presence.py @@ -35,10 +35,15 @@ class PresenceStatusRestServlet(RestServlet): self.clock = hs.get_clock() self.auth = hs.get_auth() + self._use_presence = hs.config.server.use_presence + async def on_GET(self, request, user_id): requester = await self.auth.get_user_by_req(request) user = UserID.from_string(user_id) + if not self._use_presence: + return 200, {"presence": "offline"} + if requester.user != user: allowed = await self.presence_handler.is_visible( observed_user=user, observer_user=requester.user @@ -80,7 +85,7 @@ class PresenceStatusRestServlet(RestServlet): except Exception: raise SynapseError(400, "Unable to parse state") - if self.hs.config.use_presence: + if self._use_presence: await self.presence_handler.set_state(user, state) return 200, {} diff --git a/synapse/server.py b/synapse/server.py index 67598fffe3..8c147be2b3 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -418,10 +418,10 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_presence_handler(self) -> BasePresenceHandler: - if self.config.worker_app: - return WorkerPresenceHandler(self) - else: + if self.get_instance_name() in self.config.worker.writers.presence: return PresenceHandler(self) + else: + return WorkerPresenceHandler(self) @cache_in_self def get_typing_writer_handler(self) -> TypingWriterHandler: diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 5c50f5f950..49c7606d51 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -17,7 +17,6 @@ import logging from typing import List, Optional, Tuple -from synapse.api.constants import PresenceState from synapse.config.homeserver import HomeServerConfig from synapse.storage.database import DatabasePool from synapse.storage.databases.main.stats import UserSortOrder @@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore from .metrics import ServerMetricsStore from .monthly_active_users import MonthlyActiveUsersStore from .openid import OpenIdStore -from .presence import PresenceStore, UserPresenceState +from .presence import PresenceStore from .profile import ProfileStore from .purge_events import PurgeEventsStore from .push_rule import PushRuleStore @@ -126,9 +125,6 @@ class DataStore( self._clock = hs.get_clock() self.database_engine = database.engine - self._presence_id_gen = StreamIdGenerator( - db_conn, "presence_stream", "stream_id" - ) self._public_room_id_gen = StreamIdGenerator( db_conn, "public_room_list_stream", "stream_id" ) @@ -177,21 +173,6 @@ class DataStore( super().__init__(database, db_conn, hs) - self._presence_on_startup = self._get_active_presence(db_conn) - - presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( - db_conn, - "presence_stream", - entity_column="user_id", - stream_column="stream_id", - max_value=self._presence_id_gen.get_current_token(), - ) - self.presence_stream_cache = StreamChangeCache( - "PresenceStreamChangeCache", - min_presence_val, - prefilled_cache=presence_cache_prefill, - ) - device_list_max = self._device_list_id_gen.get_current_token() self._device_list_stream_cache = StreamChangeCache( "DeviceListStreamChangeCache", device_list_max @@ -238,32 +219,6 @@ class DataStore( def get_device_stream_token(self) -> int: return self._device_list_id_gen.get_current_token() - def take_presence_startup_info(self): - active_on_startup = self._presence_on_startup - self._presence_on_startup = None - return active_on_startup - - def _get_active_presence(self, db_conn): - """Fetch non-offline presence from the database so that we can register - the appropriate time outs. - """ - - sql = ( - "SELECT user_id, state, last_active_ts, last_federation_update_ts," - " last_user_sync_ts, status_msg, currently_active FROM presence_stream" - " WHERE state != ?" - ) - - txn = db_conn.cursor() - txn.execute(sql, (PresenceState.OFFLINE,)) - rows = self.db_pool.cursor_to_dict(txn) - txn.close() - - for row in rows: - row["currently_active"] = bool(row["currently_active"]) - - return [UserPresenceState(**row) for row in rows] - async def get_users(self) -> List[JsonDict]: """Function to retrieve a list of users in users table. diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py index c207d917b1..db22fab23e 100644 --- a/synapse/storage/databases/main/presence.py +++ b/synapse/storage/databases/main/presence.py @@ -12,16 +12,69 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Dict, List, Tuple +from typing import TYPE_CHECKING, Dict, List, Tuple -from synapse.api.presence import UserPresenceState +from synapse.api.presence import PresenceState, UserPresenceState +from synapse.replication.tcp.streams import PresenceStream from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause +from synapse.storage.database import DatabasePool +from synapse.storage.engines import PostgresEngine +from synapse.storage.types import Connection +from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator from synapse.util.caches.descriptors import cached, cachedList +from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.iterutils import batch_iter +if TYPE_CHECKING: + from synapse.server import HomeServer + class PresenceStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: Connection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self._can_persist_presence = ( + hs.get_instance_name() in hs.config.worker.writers.presence + ) + + if isinstance(database.engine, PostgresEngine): + self._presence_id_gen = MultiWriterIdGenerator( + db_conn=db_conn, + db=database, + stream_name="presence_stream", + instance_name=self._instance_name, + tables=[("presence_stream", "instance_name", "stream_id")], + sequence_name="presence_stream_sequence", + writers=hs.config.worker.writers.to_device, + ) + else: + self._presence_id_gen = StreamIdGenerator( + db_conn, "presence_stream", "stream_id" + ) + + self._presence_on_startup = self._get_active_presence(db_conn) + + presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict( + db_conn, + "presence_stream", + entity_column="user_id", + stream_column="stream_id", + max_value=self._presence_id_gen.get_current_token(), + ) + self.presence_stream_cache = StreamChangeCache( + "PresenceStreamChangeCache", + min_presence_val, + prefilled_cache=presence_cache_prefill, + ) + async def update_presence(self, presence_states): + assert self._can_persist_presence + stream_ordering_manager = self._presence_id_gen.get_next_mult( len(presence_states) ) @@ -57,6 +110,7 @@ class PresenceStore(SQLBaseStore): "last_user_sync_ts": state.last_user_sync_ts, "status_msg": state.status_msg, "currently_active": state.currently_active, + "instance_name": self._instance_name, } for stream_id, state in zip(stream_orderings, presence_states) ], @@ -216,3 +270,37 @@ class PresenceStore(SQLBaseStore): def get_current_presence_token(self): return self._presence_id_gen.get_current_token() + + def _get_active_presence(self, db_conn: Connection): + """Fetch non-offline presence from the database so that we can register + the appropriate time outs. + """ + + sql = ( + "SELECT user_id, state, last_active_ts, last_federation_update_ts," + " last_user_sync_ts, status_msg, currently_active FROM presence_stream" + " WHERE state != ?" + ) + + txn = db_conn.cursor() + txn.execute(sql, (PresenceState.OFFLINE,)) + rows = self.db_pool.cursor_to_dict(txn) + txn.close() + + for row in rows: + row["currently_active"] = bool(row["currently_active"]) + + return [UserPresenceState(**row) for row in rows] + + def take_presence_startup_info(self): + active_on_startup = self._presence_on_startup + self._presence_on_startup = None + return active_on_startup + + def process_replication_rows(self, stream_name, instance_name, token, rows): + if stream_name == PresenceStream.NAME: + self._presence_id_gen.advance(instance_name, token) + for row in rows: + self.presence_stream_cache.entity_has_changed(row.user_id, token) + self._get_presence_for_user.invalidate((row.user_id,)) + return super().process_replication_rows(stream_name, instance_name, token, rows) diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql new file mode 100644 index 0000000000..b6ba0bda1a --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql @@ -0,0 +1,18 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Add a column to specify which instance wrote the row. Historic rows have +-- `NULL`, which indicates that the master instance wrote them. +ALTER TABLE presence_stream ADD COLUMN instance_name TEXT; diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres new file mode 100644 index 0000000000..02b182adf9 --- /dev/null +++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres @@ -0,0 +1,20 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence; + +SELECT setval('presence_stream_sequence', ( + SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream +)); diff --git a/tests/app/test_frontend_proxy.py b/tests/app/test_frontend_proxy.py deleted file mode 100644 index 3d45da38ab..0000000000 --- a/tests/app/test_frontend_proxy.py +++ /dev/null @@ -1,83 +0,0 @@ -# Copyright 2018 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.app.generic_worker import GenericWorkerServer - -from tests.server import make_request -from tests.unittest import HomeserverTestCase - - -class FrontendProxyTests(HomeserverTestCase): - def make_homeserver(self, reactor, clock): - - hs = self.setup_test_homeserver( - federation_http_client=None, homeserver_to_use=GenericWorkerServer - ) - - return hs - - def default_config(self): - c = super().default_config() - c["worker_app"] = "synapse.app.frontend_proxy" - - c["worker_listeners"] = [ - { - "type": "http", - "port": 8080, - "bind_addresses": ["0.0.0.0"], - "resources": [{"names": ["client"]}], - } - ] - - return c - - def test_listen_http_with_presence_enabled(self): - """ - When presence is on, the stub servlet will not register. - """ - # Presence is on - self.hs.config.use_presence = True - - # Listen with the config - self.hs._listen_http(self.hs.config.worker.worker_listeners[0]) - - # Grab the resource from the site that was told to listen - self.assertEqual(len(self.reactor.tcpServers), 1) - site = self.reactor.tcpServers[0][1] - - channel = make_request(self.reactor, site, "PUT", "presence/a/status") - - # 400 + unrecognised, because nothing is registered - self.assertEqual(channel.code, 400) - self.assertEqual(channel.json_body["errcode"], "M_UNRECOGNIZED") - - def test_listen_http_with_presence_disabled(self): - """ - When presence is off, the stub servlet will register. - """ - # Presence is off - self.hs.config.use_presence = False - - # Listen with the config - self.hs._listen_http(self.hs.config.worker.worker_listeners[0]) - - # Grab the resource from the site that was told to listen - self.assertEqual(len(self.reactor.tcpServers), 1) - site = self.reactor.tcpServers[0][1] - - channel = make_request(self.reactor, site, "PUT", "presence/a/status") - - # 401, because the stub servlet still checks authentication - self.assertEqual(channel.code, 401) - self.assertEqual(channel.json_body["errcode"], "M_MISSING_TOKEN") diff --git a/tests/rest/client/v1/test_presence.py b/tests/rest/client/v1/test_presence.py index 3a050659ca..409f3949dc 100644 --- a/tests/rest/client/v1/test_presence.py +++ b/tests/rest/client/v1/test_presence.py @@ -16,6 +16,7 @@ from unittest.mock import Mock from twisted.internet import defer +from synapse.handlers.presence import PresenceHandler from synapse.rest.client.v1 import presence from synapse.types import UserID @@ -32,7 +33,7 @@ class PresenceTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): - presence_handler = Mock() + presence_handler = Mock(spec=PresenceHandler) presence_handler.set_state.return_value = defer.succeed(None) hs = self.setup_test_homeserver( @@ -59,12 +60,12 @@ class PresenceTestCase(unittest.HomeserverTestCase): self.assertEqual(channel.code, 200) self.assertEqual(self.hs.get_presence_handler().set_state.call_count, 1) + @unittest.override_config({"use_presence": False}) def test_put_presence_disabled(self): """ PUT to the status endpoint with use_presence disabled will NOT call set_state on the presence handler. """ - self.hs.config.use_presence = False body = {"presence": "here", "status_msg": "beep boop"} channel = self.make_request( -- cgit 1.5.1 From 4e0fd35bc918b6901fcd29371ab6d89db8ce1b5e Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Wed, 28 Apr 2021 11:04:38 +0100 Subject: Revert "Experimental Federation Speedup (#9702)" This reverts commit 05e8c70c059f8ebb066e029bc3aa3e0cefef1019. --- changelog.d/9702.misc | 1 - contrib/experiments/test_messaging.py | 42 +++--- synapse/federation/sender/__init__.py | 145 ++++++++------------- synapse/federation/sender/per_destination_queue.py | 15 +-- synapse/storage/databases/main/transactions.py | 28 ++-- 5 files changed, 93 insertions(+), 138 deletions(-) delete mode 100644 changelog.d/9702.misc (limited to 'synapse/storage/databases/main') diff --git a/changelog.d/9702.misc b/changelog.d/9702.misc deleted file mode 100644 index c6e63450a9..0000000000 --- a/changelog.d/9702.misc +++ /dev/null @@ -1 +0,0 @@ -Speed up federation transmission by using fewer database calls. Contributed by @ShadowJonathan. diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py index 5dd172052b..31b8a68225 100644 --- a/contrib/experiments/test_messaging.py +++ b/contrib/experiments/test_messaging.py @@ -224,16 +224,14 @@ class HomeServer(ReplicationHandler): destinations = yield self.get_servers_for_context(room_name) try: - yield self.replication_layer.send_pdus( - [ - Pdu.create_new( - context=room_name, - pdu_type="sy.room.message", - content={"sender": sender, "body": body}, - origin=self.server_name, - destinations=destinations, - ) - ] + yield self.replication_layer.send_pdu( + Pdu.create_new( + context=room_name, + pdu_type="sy.room.message", + content={"sender": sender, "body": body}, + origin=self.server_name, + destinations=destinations, + ) ) except Exception as e: logger.exception(e) @@ -255,7 +253,7 @@ class HomeServer(ReplicationHandler): origin=self.server_name, destinations=destinations, ) - yield self.replication_layer.send_pdus([pdu]) + yield self.replication_layer.send_pdu(pdu) except Exception as e: logger.exception(e) @@ -267,18 +265,16 @@ class HomeServer(ReplicationHandler): destinations = yield self.get_servers_for_context(room_name) try: - yield self.replication_layer.send_pdus( - [ - Pdu.create_new( - context=room_name, - is_state=True, - pdu_type="sy.room.member", - state_key=invitee, - content={"membership": "invite"}, - origin=self.server_name, - destinations=destinations, - ) - ] + yield self.replication_layer.send_pdu( + Pdu.create_new( + context=room_name, + is_state=True, + pdu_type="sy.room.member", + state_key=invitee, + content={"membership": "invite"}, + origin=self.server_name, + destinations=destinations, + ) ) except Exception as e: logger.exception(e) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 022bbf7dad..deb40f4610 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -14,26 +14,19 @@ import abc import logging -from typing import ( - TYPE_CHECKING, - Collection, - Dict, - Hashable, - Iterable, - List, - Optional, - Set, - Tuple, -) +from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple from prometheus_client import Counter +from twisted.internet import defer + import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender): if not events and next_token >= self._last_poked_id: break - async def get_destinations_for_event( - event: EventBase, - ) -> Collection[str]: - """Computes the destinations to which this event must be sent. - - This returns an empty tuple when there are no destinations to send to, - or if this event is not from this homeserver and it is not sending - it on behalf of another server. - - Will also filter out destinations which this sender is not responsible for, - if multiple federation senders exist. - """ - + async def handle_event(event: EventBase) -> None: # Only send events for this server. send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of() is_mine = self.is_mine_id(event.sender) if not is_mine and send_on_behalf_of is None: - return () + return if not event.internal_metadata.should_proactively_send(): - return () + return destinations = None # type: Optional[Set[str]] if not event.prev_event_ids(): @@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender): "Failed to calculate hosts in room for event: %s", event.event_id, ) - return () + return destinations = { d @@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender): ) } - destinations.discard(self.server_name) - if send_on_behalf_of is not None: # If we are sending the event on behalf of another server # then it already has the event and there is no reason to # send the event to it. destinations.discard(send_on_behalf_of) + logger.debug("Sending %s to %r", event, destinations) + if destinations: + await self._send_pdu(event, destinations) + now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender): "federation_sender" ).observe((now - ts) / 1000) - return destinations - return () - - async def get_federatable_events_and_destinations( - events: Iterable[EventBase], - ) -> List[Tuple[EventBase, Collection[str]]]: - with Measure(self.clock, "get_destinations_for_events"): - # Fetch federation destinations per event, - # skip if get_destinations_for_event returns an empty collection, - # return list of event->destinations pairs. - return [ - (event, dests) - for (event, dests) in [ - (event, await get_destinations_for_event(event)) - for event in events - ] - if dests - ] - - events_and_dests = await get_federatable_events_and_destinations(events) - - # Send corresponding events to each destination queue - await self._distribute_events(events_and_dests) + async def handle_room_events(events: Iterable[EventBase]) -> None: + with Measure(self.clock, "handle_room_events"): + for event in events: + await handle_event(event) + + events_by_room = {} # type: Dict[str, List[EventBase]] + for event in events: + events_by_room.setdefault(event.room_id, []).append(event) + + await make_deferred_yieldable( + defer.gatherResults( + [ + run_in_background(handle_room_events, evs) + for evs in events_by_room.values() + ], + consumeErrors=True, + ) + ) await self.store.update_federation_out_pos("events", next_token) @@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender): events_processed_counter.inc(len(events)) event_processing_loop_room_count.labels("federation_sender").inc( - len({event.room_id for event in events}) + len(events_by_room) ) event_processing_loop_counter.labels("federation_sender").inc() @@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender): finally: self._is_processing = False - async def _distribute_events( - self, - events_and_dests: Iterable[Tuple[EventBase, Collection[str]]], - ) -> None: - """Distribute events to the respective per_destination queues. - - Also persists last-seen per-room stream_ordering to 'destination_rooms'. - - Args: - events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]). - Every event is paired with its intended destinations (in federation). - """ - # Tuples of room_id + destination to their max-seen stream_ordering - room_with_dest_stream_ordering = {} # type: Dict[Tuple[str, str], int] - - # List of events to send to each destination - events_by_dest = {} # type: Dict[str, List[EventBase]] + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + # We loop through all destinations to see whether we already have + # a transaction in progress. If we do, stick it in the pending_pdus + # table and we'll get back to it later. - # For each event-destinations pair... - for event, destinations in events_and_dests: + destinations = set(destinations) + destinations.discard(self.server_name) + logger.debug("Sending to: %s", str(destinations)) - # (we got this from the database, it's filled) - assert event.internal_metadata.stream_ordering - - sent_pdus_destination_dist_total.inc(len(destinations)) - sent_pdus_destination_dist_count.inc() + if not destinations: + return - # ...iterate over those destinations.. - for destination in destinations: - # ...update their stream-ordering... - room_with_dest_stream_ordering[(event.room_id, destination)] = max( - event.internal_metadata.stream_ordering, - room_with_dest_stream_ordering.get((event.room_id, destination), 0), - ) + sent_pdus_destination_dist_total.inc(len(destinations)) + sent_pdus_destination_dist_count.inc() - # ...and add the event to each destination queue. - events_by_dest.setdefault(destination, []).append(event) + assert pdu.internal_metadata.stream_ordering - # Bulk-store destination_rooms stream_ids - await self.store.bulk_store_destination_rooms_entries( - room_with_dest_stream_ordering + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, + pdu.room_id, + pdu.internal_metadata.stream_ordering, ) - for destination, pdus in events_by_dest.items(): - logger.debug("Sending %d pdus to %s", len(pdus), destination) - - self._get_per_destination_queue(destination).send_pdus(pdus) + for destination in destinations: + self._get_per_destination_queue(destination).send_pdu(pdu) async def send_read_receipt(self, receipt: ReadReceipt) -> None: """Send a RR to any other servers in the room diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3bb66bce32..3b053ebcfb 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -154,22 +154,19 @@ class PerDestinationQueue: + len(self._pending_edus_keyed) ) - def send_pdus(self, pdus: Iterable[EventBase]) -> None: - """Add PDUs to the queue, and start the transmission loop if necessary + def send_pdu(self, pdu: EventBase) -> None: + """Add a PDU to the queue, and start the transmission loop if necessary Args: - pdus: pdus to send + pdu: pdu to send """ if not self._catching_up or self._last_successful_stream_ordering is None: # only enqueue the PDU if we are not catching up (False) or do not # yet know if we have anything to catch up (None) - self._pending_pdus.extend(pdus) + self._pending_pdus.append(pdu) else: - self._catchup_last_skipped = max( - pdu.internal_metadata.stream_ordering - for pdu in pdus - if pdu.internal_metadata.stream_ordering is not None - ) + assert pdu.internal_metadata.stream_ordering + self._catchup_last_skipped = pdu.internal_metadata.stream_ordering self.attempt_new_transaction() diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index b28ca61f80..82335e7a9d 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from collections import namedtuple -from typing import Dict, List, Optional, Tuple +from typing import Iterable, List, Optional, Tuple from canonicaljson import encode_canonical_json @@ -295,33 +295,37 @@ class TransactionStore(TransactionWorkerStore): }, ) - async def bulk_store_destination_rooms_entries( - self, room_and_destination_to_ordering: Dict[Tuple[str, str], int] - ): + async def store_destination_rooms_entries( + self, + destinations: Iterable[str], + room_id: str, + stream_ordering: int, + ) -> None: """ - Updates or creates `destination_rooms` entries for a number of events. + Updates or creates `destination_rooms` entries in batch for a single event. Args: - room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id + destinations: list of destinations + room_id: the room_id of the event + stream_ordering: the stream_ordering of the event """ await self.db_pool.simple_upsert_many( table="destinations", key_names=("destination",), - key_values={(d,) for _, d in room_and_destination_to_ordering.keys()}, + key_values=[(d,) for d in destinations], value_names=[], value_values=[], desc="store_destination_rooms_entries_dests", ) + rows = [(destination, room_id) for destination in destinations] await self.db_pool.simple_upsert_many( table="destination_rooms", - key_names=("room_id", "destination"), - key_values=list(room_and_destination_to_ordering.keys()), + key_names=("destination", "room_id"), + key_values=rows, value_names=["stream_ordering"], - value_values=[ - (stream_id,) for stream_id in room_and_destination_to_ordering.values() - ], + value_values=[(stream_ordering,)] * len(rows), desc="store_destination_rooms_entries_rooms", ) -- cgit 1.5.1