From 54317d34b76adb1e8f694acd91f631b3abe38947 Mon Sep 17 00:00:00 2001 From: Alexander Fechler <141915399+afechler@users.noreply.github.com> Date: Fri, 18 Aug 2023 13:26:38 +0200 Subject: Allow filtering for admins in the list accounts admin API (#16114) --- tests/rest/admin/test_user.py | 38 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) (limited to 'tests') diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py index 41a959b4d6..feb81844ae 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py @@ -879,6 +879,44 @@ class UsersListTestCase(unittest.HomeserverTestCase): self._order_test([self.admin_user, user1, user2], "creation_ts", "f") self._order_test([user2, user1, self.admin_user], "creation_ts", "b") + def test_filter_admins(self) -> None: + """ + Tests whether the various values of the query parameter `admins` lead to the + expected result set. + """ + + # Register an additional non admin user + self.register_user("user", "pass", admin=False) + + # Query all users + channel = self.make_request( + "GET", + f"{self.url}", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + self.assertEqual(2, channel.json_body["total"]) + + # Query only admin users + channel = self.make_request( + "GET", + f"{self.url}?admins=true", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + self.assertEqual(1, channel.json_body["total"]) + self.assertEqual(1, channel.json_body["users"][0]["admin"]) + + # Query only non admin users + channel = self.make_request( + "GET", + f"{self.url}?admins=false", + access_token=self.admin_user_tok, + ) + self.assertEqual(200, channel.code, channel.result) + self.assertEqual(1, channel.json_body["total"]) + self.assertFalse(channel.json_body["users"][0]["admin"]) + @override_config( { "experimental_features": { -- cgit 1.5.1 From 2d15e396843879bb514a148097cbddf10f50655c Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Fri, 18 Aug 2023 15:46:46 +0200 Subject: MSC3861: allow impersonation by an admin using a query param (#16132) --- changelog.d/16132.misc | 1 + synapse/api/auth/msc3861_delegated.py | 25 ++++++++++++++++++++--- tests/handlers/test_oauth_delegation.py | 35 +++++++++++++++++++++++++++++++++ 3 files changed, 58 insertions(+), 3 deletions(-) create mode 100644 changelog.d/16132.misc (limited to 'tests') diff --git a/changelog.d/16132.misc b/changelog.d/16132.misc new file mode 100644 index 0000000000..aca26079d8 --- /dev/null +++ b/changelog.d/16132.misc @@ -0,0 +1 @@ +MSC3861: allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter. diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 18875f2c81..4bdfe31b22 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -246,7 +246,7 @@ class MSC3861DelegatedAuth(BaseAuth): return introspection_token async def is_server_admin(self, requester: Requester) -> bool: - return "urn:synapse:admin:*" in requester.scope + return SCOPE_SYNAPSE_ADMIN in requester.scope async def get_user_by_req( self, @@ -263,6 +263,25 @@ class MSC3861DelegatedAuth(BaseAuth): # so that we don't provision the user if they don't have enough permission: requester = await self.get_user_by_access_token(access_token, allow_expired) + # Allow impersonation by an admin user using `_oidc_admin_impersonate_user_id` query parameter + if request.args is not None: + user_id_params = request.args.get(b"_oidc_admin_impersonate_user_id") + if user_id_params: + if await self.is_server_admin(requester): + user_id_str = user_id_params[0].decode("ascii") + impersonated_user_id = UserID.from_string(user_id_str) + logging.info(f"Admin impersonation of user {user_id_str}") + requester = create_requester( + user_id=impersonated_user_id, + scope=[SCOPE_MATRIX_API], + authenticated_entity=requester.user.to_string(), + ) + else: + raise AuthError( + 401, + "Impersonation not possible by a non admin user", + ) + # Deny the request if the user account is locked. if not allow_locked and await self.store.get_user_locked_status( requester.user.to_string() @@ -290,14 +309,14 @@ class MSC3861DelegatedAuth(BaseAuth): # XXX: This is a temporary solution so that the admin API can be called by # the OIDC provider. This will be removed once we have OIDC client # credentials grant support in matrix-authentication-service. - logging.info("Admin toked used") + logging.info("Admin token used") # XXX: that user doesn't exist and won't be provisioned. # This is mostly fine for admin calls, but we should also think about doing # requesters without a user_id. admin_user = UserID("__oidc_admin", self._hostname) return create_requester( user_id=admin_user, - scope=["urn:synapse:admin:*"], + scope=[SCOPE_SYNAPSE_ADMIN], ) try: diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index 82c26e303f..1456b675a7 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -340,6 +340,41 @@ class MSC3861OAuthDelegation(HomeserverTestCase): get_awaitable_result(self.auth.is_server_admin(requester)), False ) + def test_active_user_admin_impersonation(self) -> None: + """The handler should return a requester with normal user rights + and an user ID matching the one specified in query param `user_id`""" + + self.http_client.request = simple_async_mock( + return_value=FakeResponse.json( + code=200, + payload={ + "active": True, + "sub": SUBJECT, + "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]), + "username": USERNAME, + }, + ) + ) + request = Mock(args={}) + request.args[b"access_token"] = [b"mockAccessToken"] + impersonated_user_id = f"@{USERNAME}:{SERVER_NAME}" + request.args[b"_oidc_admin_impersonate_user_id"] = [ + impersonated_user_id.encode("ascii") + ] + request.requestHeaders.getRawHeaders = mock_getRawHeaders() + requester = self.get_success(self.auth.get_user_by_req(request)) + self.http_client.get_json.assert_called_once_with(WELL_KNOWN) + self.http_client.request.assert_called_once_with( + method="POST", uri=INTROSPECTION_ENDPOINT, data=ANY, headers=ANY + ) + self._assertParams() + self.assertEqual(requester.user.to_string(), impersonated_user_id) + self.assertEqual(requester.is_guest, False) + self.assertEqual(requester.device_id, None) + self.assertEqual( + get_awaitable_result(self.auth.is_server_admin(requester)), False + ) + def test_active_user_with_device(self) -> None: """The handler should return a requester with normal user rights and a device ID.""" -- cgit 1.5.1 From bd558a6dc369b6f5d06ab6fd2500faa216a45883 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 18 Aug 2023 15:32:06 +0100 Subject: Speed up state res in rare case we don't have all events (#16116) If we don't have all the auth events in a room then not all state events will have a chain cover index. Even so, we can still use the chain cover index on the events that do have it, rather than bailing and using the slower functions. This situation should not arise for newly persisted rooms, as we check we have the full auth chain for each event, but can happen for existing rooms. c.f. #15245 --- changelog.d/16116.bugfix | 1 + synapse/storage/databases/main/event_federation.py | 184 ++++++++++++++-- tests/storage/test_event_federation.py | 241 +++++++++++++++++---- 3 files changed, 355 insertions(+), 71 deletions(-) create mode 100644 changelog.d/16116.bugfix (limited to 'tests') diff --git a/changelog.d/16116.bugfix b/changelog.d/16116.bugfix new file mode 100644 index 0000000000..f57a26ae39 --- /dev/null +++ b/changelog.d/16116.bugfix @@ -0,0 +1 @@ +Fix performance of state resolutions for large, old rooms that did not have the full auth chain persisted. diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 534dc32413..fab7008a8f 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -452,33 +452,56 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # sets. seen_chains: Set[int] = set() - sql = """ - SELECT event_id, chain_id, sequence_number - FROM event_auth_chains - WHERE %s - """ - for batch in batch_iter(initial_events, 1000): - clause, args = make_in_list_sql_clause( - txn.database_engine, "event_id", batch - ) - txn.execute(sql % (clause,), args) + # Fetch the chain cover index for the initial set of events we're + # considering. + def fetch_chain_info(events_to_fetch: Collection[str]) -> None: + sql = """ + SELECT event_id, chain_id, sequence_number + FROM event_auth_chains + WHERE %s + """ + for batch in batch_iter(events_to_fetch, 1000): + clause, args = make_in_list_sql_clause( + txn.database_engine, "event_id", batch + ) + txn.execute(sql % (clause,), args) - for event_id, chain_id, sequence_number in txn: - chain_info[event_id] = (chain_id, sequence_number) - seen_chains.add(chain_id) - chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id + for event_id, chain_id, sequence_number in txn: + chain_info[event_id] = (chain_id, sequence_number) + seen_chains.add(chain_id) + chain_to_event.setdefault(chain_id, {})[sequence_number] = event_id + + fetch_chain_info(initial_events) # Check that we actually have a chain ID for all the events. events_missing_chain_info = initial_events.difference(chain_info) + + # The result set to return, i.e. the auth chain difference. + result: Set[str] = set() + if events_missing_chain_info: - # This can happen due to e.g. downgrade/upgrade of the server. We - # raise an exception and fall back to the previous algorithm. - logger.info( - "Unexpectedly found that events don't have chain IDs in room %s: %s", + # For some reason we have events we haven't calculated the chain + # index for, so we need to handle those separately. This should only + # happen for older rooms where the server doesn't have all the auth + # events. + result = self._fixup_auth_chain_difference_sets( + txn, room_id, - events_missing_chain_info, + state_sets=state_sets, + events_missing_chain_info=events_missing_chain_info, + events_that_have_chain_index=chain_info, ) - raise _NoChainCoverIndex(room_id) + + # We now need to refetch any events that we have added to the state + # sets. + new_events_to_fetch = { + event_id + for state_set in state_sets + for event_id in state_set + if event_id not in initial_events + } + + fetch_chain_info(new_events_to_fetch) # Corresponds to `state_sets`, except as a map from chain ID to max # sequence number reachable from the state set. @@ -487,8 +510,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas chains: Dict[int, int] = {} set_to_chain.append(chains) - for event_id in state_set: - chain_id, seq_no = chain_info[event_id] + for state_id in state_set: + chain_id, seq_no = chain_info[state_id] chains[chain_id] = max(seq_no, chains.get(chain_id, 0)) @@ -532,7 +555,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # from *any* state set and the minimum sequence number reachable from # *all* state sets. Events in that range are in the auth chain # difference. - result = set() # Mapping from chain ID to the range of sequence numbers that should be # pulled from the database. @@ -588,6 +610,122 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas return result + def _fixup_auth_chain_difference_sets( + self, + txn: LoggingTransaction, + room_id: str, + state_sets: List[Set[str]], + events_missing_chain_info: Set[str], + events_that_have_chain_index: Collection[str], + ) -> Set[str]: + """Helper for `_get_auth_chain_difference_using_cover_index_txn` to + handle the case where we haven't calculated the chain cover index for + all events. + + This modifies `state_sets` so that they only include events that have a + chain cover index, and returns a set of event IDs that are part of the + auth difference. + """ + + # This works similarly to the handling of unpersisted events in + # `synapse.state.v2_get_auth_chain_difference`. We uses the observation + # that if you can split the set of events into two classes X and Y, + # where no events in Y have events in X in their auth chain, then we can + # calculate the auth difference by considering X and Y separately. + # + # We do this in three steps: + # 1. Compute the set of events without chain cover index belonging to + # the auth difference. + # 2. Replacing the un-indexed events in the state_sets with their auth + # events, recursively, until the state_sets contain only indexed + # events. We can then calculate the auth difference of those state + # sets using the chain cover index. + # 3. Add the results of 1 and 2 together. + + # By construction we know that all events that we haven't persisted the + # chain cover index for are contained in + # `event_auth_chain_to_calculate`, so we pull out the events from those + # rather than doing recursive queries to walk the auth chain. + # + # We pull out those events with their auth events, which gives us enough + # information to construct the auth chain of an event up to auth events + # that have the chain cover index. + sql = """ + SELECT tc.event_id, ea.auth_id, eac.chain_id IS NOT NULL + FROM event_auth_chain_to_calculate AS tc + LEFT JOIN event_auth AS ea USING (event_id) + LEFT JOIN event_auth_chains AS eac ON (ea.auth_id = eac.event_id) + WHERE tc.room_id = ? + """ + txn.execute(sql, (room_id,)) + event_to_auth_ids: Dict[str, Set[str]] = {} + events_that_have_chain_index = set(events_that_have_chain_index) + for event_id, auth_id, auth_id_has_chain in txn: + s = event_to_auth_ids.setdefault(event_id, set()) + if auth_id is not None: + s.add(auth_id) + if auth_id_has_chain: + events_that_have_chain_index.add(auth_id) + + if events_missing_chain_info - event_to_auth_ids.keys(): + # Uh oh, we somehow haven't correctly done the chain cover index, + # bail and fall back to the old method. + logger.info( + "Unexpectedly found that events don't have chain IDs in room %s: %s", + room_id, + events_missing_chain_info - event_to_auth_ids.keys(), + ) + raise _NoChainCoverIndex(room_id) + + # Create a map from event IDs we care about to their partial auth chain. + event_id_to_partial_auth_chain: Dict[str, Set[str]] = {} + for event_id, auth_ids in event_to_auth_ids.items(): + if not any(event_id in state_set for state_set in state_sets): + continue + + processing = set(auth_ids) + to_add = set() + while processing: + auth_id = processing.pop() + to_add.add(auth_id) + + sub_auth_ids = event_to_auth_ids.get(auth_id) + if sub_auth_ids is None: + continue + + processing.update(sub_auth_ids - to_add) + + event_id_to_partial_auth_chain[event_id] = to_add + + # Now we do two things: + # 1. Update the state sets to only include indexed events; and + # 2. Create a new list containing the auth chains of the un-indexed + # events + unindexed_state_sets: List[Set[str]] = [] + for state_set in state_sets: + unindexed_state_set = set() + for event_id, auth_chain in event_id_to_partial_auth_chain.items(): + if event_id not in state_set: + continue + + unindexed_state_set.add(event_id) + + state_set.discard(event_id) + state_set.difference_update(auth_chain) + for auth_id in auth_chain: + if auth_id in events_that_have_chain_index: + state_set.add(auth_id) + else: + unindexed_state_set.add(auth_id) + + unindexed_state_sets.append(unindexed_state_set) + + # Calculate and return the auth difference of the un-indexed events. + union = unindexed_state_sets[0].union(*unindexed_state_sets[1:]) + intersection = unindexed_state_sets[0].intersection(*unindexed_state_sets[1:]) + + return union - intersection + def _get_auth_chain_difference_txn( self, txn: LoggingTransaction, state_sets: List[Set[str]] ) -> Set[str]: diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 9c151a5e62..7a4ecab2d5 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -13,7 +13,19 @@ # limitations under the License. import datetime -from typing import Dict, List, Tuple, Union, cast +from typing import ( + Collection, + Dict, + FrozenSet, + Iterable, + List, + Mapping, + Set, + Tuple, + TypeVar, + Union, + cast, +) import attr from parameterized import parameterized @@ -38,6 +50,138 @@ from synapse.util import Clock, json_encoder import tests.unittest import tests.utils +# The silly auth graph we use to test the auth difference algorithm, +# where the top are the most recent events. +# +# A B +# \ / +# D E +# \ | +# ` F C +# | /| +# G ´ | +# | \ | +# H I +# | | +# K J + +AUTH_GRAPH: Dict[str, List[str]] = { + "a": ["e"], + "b": ["e"], + "c": ["g", "i"], + "d": ["f"], + "e": ["f"], + "f": ["g"], + "g": ["h", "i"], + "h": ["k"], + "i": ["j"], + "k": [], + "j": [], +} + +DEPTH_GRAPH = { + "a": 7, + "b": 7, + "c": 4, + "d": 6, + "e": 6, + "f": 5, + "g": 3, + "h": 2, + "i": 2, + "k": 1, + "j": 1, +} + +T = TypeVar("T") + + +def get_all_topologically_sorted_orders( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> List[List[T]]: + """Given a set of nodes and a graph, return all possible topological + orderings. + """ + + # This is implemented by Kahn's algorithm, and forking execution each time + # we have a choice over which node to consider next. + + degree_map = {node: 0 for node in nodes} + reverse_graph: Dict[T, Set[T]] = {} + + for node, edges in graph.items(): + if node not in degree_map: + continue + + for edge in set(edges): + if edge in degree_map: + degree_map[node] += 1 + + reverse_graph.setdefault(edge, set()).add(node) + reverse_graph.setdefault(node, set()) + + zero_degree = [node for node, degree in degree_map.items() if degree == 0] + + return _get_all_topologically_sorted_orders_inner( + reverse_graph, zero_degree, degree_map + ) + + +def _get_all_topologically_sorted_orders_inner( + reverse_graph: Dict[T, Set[T]], + zero_degree: List[T], + degree_map: Dict[T, int], +) -> List[List[T]]: + new_paths = [] + + # Rather than only choosing *one* item from the list of nodes with zero + # degree, we "fork" execution and run the algorithm for each node in the + # zero degree. + for node in zero_degree: + new_degree_map = degree_map.copy() + new_zero_degree = zero_degree.copy() + new_zero_degree.remove(node) + + for edge in reverse_graph.get(node, []): + if edge in new_degree_map: + new_degree_map[edge] -= 1 + if new_degree_map[edge] == 0: + new_zero_degree.append(edge) + + paths = _get_all_topologically_sorted_orders_inner( + reverse_graph, new_zero_degree, new_degree_map + ) + for path in paths: + path.insert(0, node) + + new_paths.extend(paths) + + if not new_paths: + return [[]] + + return new_paths + + +def get_all_topologically_consistent_subsets( + nodes: Iterable[T], + graph: Mapping[T, Collection[T]], +) -> Set[FrozenSet[T]]: + """Get all subsets of the graph where if node N is in the subgraph, then all + nodes that can reach that node (i.e. for all X there exists a path X -> N) + are in the subgraph. + """ + all_topological_orderings = get_all_topologically_sorted_orders(nodes, graph) + + graph_subsets = set() + for ordering in all_topological_orderings: + ordering.reverse() + + for idx in range(len(ordering)): + graph_subsets.add(frozenset(ordering[:idx])) + + return graph_subsets + @attr.s(auto_attribs=True, frozen=True, slots=True) class _BackfillSetupInfo: @@ -172,49 +316,6 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): def _setup_auth_chain(self, use_chain_cover_index: bool) -> str: room_id = "@ROOM:local" - # The silly auth graph we use to test the auth difference algorithm, - # where the top are the most recent events. - # - # A B - # \ / - # D E - # \ | - # ` F C - # | /| - # G ´ | - # | \ | - # H I - # | | - # K J - - auth_graph: Dict[str, List[str]] = { - "a": ["e"], - "b": ["e"], - "c": ["g", "i"], - "d": ["f"], - "e": ["f"], - "f": ["g"], - "g": ["h", "i"], - "h": ["k"], - "i": ["j"], - "k": [], - "j": [], - } - - depth_map = { - "a": 7, - "b": 7, - "c": 4, - "d": 6, - "e": 6, - "f": 5, - "g": 3, - "h": 2, - "i": 2, - "k": 1, - "j": 1, - } - # Mark the room as maybe having a cover index. def store_room(txn: LoggingTransaction) -> None: @@ -238,9 +339,9 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): def insert_event(txn: LoggingTransaction) -> None: stream_ordering = 0 - for event_id in auth_graph: + for event_id in AUTH_GRAPH: stream_ordering += 1 - depth = depth_map[event_id] + depth = DEPTH_GRAPH[event_id] self.store.db_pool.simple_insert_txn( txn, @@ -260,8 +361,8 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): self.persist_events._persist_event_auth_chain_txn( txn, [ - cast(EventBase, FakeEvent(event_id, room_id, auth_graph[event_id])) - for event_id in auth_graph + cast(EventBase, FakeEvent(event_id, room_id, AUTH_GRAPH[event_id])) + for event_id in AUTH_GRAPH ], ) @@ -344,7 +445,51 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): room_id = self._setup_auth_chain(use_chain_cover_index) # Now actually test that various combinations give the right result: + self.assert_auth_diff_is_expected(room_id) + + @parameterized.expand( + [ + [graph_subset] + for graph_subset in get_all_topologically_consistent_subsets( + AUTH_GRAPH, AUTH_GRAPH + ) + ] + ) + def test_auth_difference_partial(self, graph_subset: Collection[str]) -> None: + """Test that if we only have a chain cover index on a partial subset of + the room we still get the correct auth chain difference. + + We do this by removing the chain cover index for every valid subset of the + graph. + """ + room_id = self._setup_auth_chain(True) + + for event_id in graph_subset: + # Remove chain cover from that event. + self.get_success( + self.store.db_pool.simple_delete( + table="event_auth_chains", + keyvalues={"event_id": event_id}, + desc="test_auth_difference_partial_remove", + ) + ) + self.get_success( + self.store.db_pool.simple_insert( + table="event_auth_chain_to_calculate", + values={ + "event_id": event_id, + "room_id": room_id, + "type": "", + "state_key": "", + }, + desc="test_auth_difference_partial_remove", + ) + ) + + self.assert_auth_diff_is_expected(room_id) + def assert_auth_diff_is_expected(self, room_id: str) -> None: + """Assert the auth chain difference returns the correct answers.""" difference = self.get_success( self.store.get_auth_chain_difference(room_id, [{"a"}, {"b"}]) ) -- cgit 1.5.1 From 358896e1b835bf693ef40d4cf9f10077432e935b Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Mon, 21 Aug 2023 14:17:13 +0200 Subject: Implements a task scheduler for resumable potentially long running tasks (#15891) --- changelog.d/15891.feature | 1 + synapse/app/generic_worker.py | 2 + synapse/server.py | 7 +- synapse/storage/databases/main/__init__.py | 2 + synapse/storage/databases/main/task_scheduler.py | 202 ++++++++++++ synapse/storage/schema/__init__.py | 1 + .../schema/main/delta/80/02_scheduled_tasks.sql | 28 ++ synapse/types/__init__.py | 39 +++ synapse/util/task_scheduler.py | 364 +++++++++++++++++++++ tests/util/test_task_scheduler.py | 186 +++++++++++ 10 files changed, 831 insertions(+), 1 deletion(-) create mode 100644 changelog.d/15891.feature create mode 100644 synapse/storage/databases/main/task_scheduler.py create mode 100644 synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql create mode 100644 synapse/util/task_scheduler.py create mode 100644 tests/util/test_task_scheduler.py (limited to 'tests') diff --git a/changelog.d/15891.feature b/changelog.d/15891.feature new file mode 100644 index 0000000000..5024b5adc4 --- /dev/null +++ b/changelog.d/15891.feature @@ -0,0 +1 @@ +Implements a task scheduler for resumable potentially long running tasks. diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py index dc79efcc14..d25e3548e0 100644 --- a/synapse/app/generic_worker.py +++ b/synapse/app/generic_worker.py @@ -91,6 +91,7 @@ from synapse.storage.databases.main.state import StateGroupWorkerStore from synapse.storage.databases.main.stats import StatsStore from synapse.storage.databases.main.stream import StreamWorkerStore from synapse.storage.databases.main.tags import TagsWorkerStore +from synapse.storage.databases.main.task_scheduler import TaskSchedulerWorkerStore from synapse.storage.databases.main.transactions import TransactionWorkerStore from synapse.storage.databases.main.ui_auth import UIAuthWorkerStore from synapse.storage.databases.main.user_directory import UserDirectoryStore @@ -144,6 +145,7 @@ class GenericWorkerStore( TransactionWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): # Properties that multiple storage classes define. Tell mypy what the # expected type is. diff --git a/synapse/server.py b/synapse/server.py index e753ff0377..7cdd3ea3c2 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -142,6 +142,7 @@ from synapse.util.distributor import Distributor from synapse.util.macaroons import MacaroonGenerator from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.stringutils import random_string +from synapse.util.task_scheduler import TaskScheduler logger = logging.getLogger(__name__) @@ -360,6 +361,7 @@ class HomeServer(metaclass=abc.ABCMeta): """ for i in self.REQUIRED_ON_BACKGROUND_TASK_STARTUP: getattr(self, "get_" + i + "_handler")() + self.get_task_scheduler() def get_reactor(self) -> ISynapseReactor: """ @@ -912,6 +914,9 @@ class HomeServer(metaclass=abc.ABCMeta): """Usage metrics shared between phone home stats and the prometheus exporter.""" return CommonUsageMetricsManager(self) - @cache_in_self def get_worker_locks_handler(self) -> WorkerLocksHandler: return WorkerLocksHandler(self) + + @cache_in_self + def get_task_scheduler(self) -> TaskScheduler: + return TaskScheduler(self) diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index e17f25e87a..a85633efcd 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -70,6 +70,7 @@ from .state import StateStore from .stats import StatsStore from .stream import StreamWorkerStore from .tags import TagsStore +from .task_scheduler import TaskSchedulerWorkerStore from .transactions import TransactionWorkerStore from .ui_auth import UIAuthStore from .user_directory import UserDirectoryStore @@ -127,6 +128,7 @@ class DataStore( CacheInvalidationWorkerStore, LockStore, SessionStore, + TaskSchedulerWorkerStore, ): def __init__( self, diff --git a/synapse/storage/databases/main/task_scheduler.py b/synapse/storage/databases/main/task_scheduler.py new file mode 100644 index 0000000000..1fb3180c3c --- /dev/null +++ b/synapse/storage/databases/main/task_scheduler.py @@ -0,0 +1,202 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import TYPE_CHECKING, Any, Dict, List, Optional + +from synapse.storage._base import SQLBaseStore, db_to_json +from synapse.storage.database import ( + DatabasePool, + LoggingDatabaseConnection, + LoggingTransaction, + make_in_list_sql_clause, +) +from synapse.types import JsonDict, JsonMapping, ScheduledTask, TaskStatus +from synapse.util import json_encoder + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class TaskSchedulerWorkerStore(SQLBaseStore): + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + @staticmethod + def _convert_row_to_task(row: Dict[str, Any]) -> ScheduledTask: + row["status"] = TaskStatus(row["status"]) + if row["params"] is not None: + row["params"] = db_to_json(row["params"]) + if row["result"] is not None: + row["result"] = db_to_json(row["result"]) + return ScheduledTask(**row) + + async def get_scheduled_tasks( + self, + *, + actions: Optional[List[str]] = None, + resource_id: Optional[str] = None, + statuses: Optional[List[TaskStatus]] = None, + max_timestamp: Optional[int] = None, + ) -> List[ScheduledTask]: + """Get a list of scheduled tasks from the DB. + + Args: + actions: Limit the returned tasks to those specific action names + resource_id: Limit the returned tasks to the specific resource id, if specified + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one + + Returns: a list of `ScheduledTask`, ordered by increasing timestamps + """ + + def get_scheduled_tasks_txn(txn: LoggingTransaction) -> List[Dict[str, Any]]: + clauses: List[str] = [] + args: List[Any] = [] + if resource_id: + clauses.append("resource_id = ?") + args.append(resource_id) + if actions is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "action", actions + ) + clauses.append(clause) + args.extend(temp_args) + if statuses is not None: + clause, temp_args = make_in_list_sql_clause( + txn.database_engine, "status", statuses + ) + clauses.append(clause) + args.extend(temp_args) + if max_timestamp is not None: + clauses.append("timestamp <= ?") + args.append(max_timestamp) + + sql = "SELECT * FROM scheduled_tasks" + if clauses: + sql = sql + " WHERE " + " AND ".join(clauses) + + sql = sql + "ORDER BY timestamp" + + txn.execute(sql, args) + return self.db_pool.cursor_to_dict(txn) + + rows = await self.db_pool.runInteraction( + "get_scheduled_tasks", get_scheduled_tasks_txn + ) + return [TaskSchedulerWorkerStore._convert_row_to_task(row) for row in rows] + + async def insert_scheduled_task(self, task: ScheduledTask) -> None: + """Insert a specified `ScheduledTask` in the DB. + + Args: + task: the `ScheduledTask` to insert + """ + await self.db_pool.simple_insert( + "scheduled_tasks", + { + "id": task.id, + "action": task.action, + "status": task.status, + "timestamp": task.timestamp, + "resource_id": task.resource_id, + "params": None + if task.params is None + else json_encoder.encode(task.params), + "result": None + if task.result is None + else json_encoder.encode(task.result), + "error": task.error, + }, + desc="insert_scheduled_task", + ) + + async def update_scheduled_task( + self, + id: str, + timestamp: int, + *, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> bool: + """Update a scheduled task in the DB with some new value(s). + + Args: + id: id of the `ScheduledTask` to update + timestamp: new timestamp of the task + status: new status of the task + result: new result of the task + error: new error of the task + + Returns: `False` if no matching row was found, `True` otherwise + """ + updatevalues: JsonDict = {"timestamp": timestamp} + if status is not None: + updatevalues["status"] = status + if result is not None: + updatevalues["result"] = json_encoder.encode(result) + if error is not None: + updatevalues["error"] = error + nb_rows = await self.db_pool.simple_update( + "scheduled_tasks", + {"id": id}, + updatevalues, + desc="update_scheduled_task", + ) + return nb_rows > 0 + + async def get_scheduled_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific `ScheduledTask` from its id. + + Args: + id: the id of the task to retrieve + + Returns: the task if available, `None` otherwise + """ + row = await self.db_pool.simple_select_one( + table="scheduled_tasks", + keyvalues={"id": id}, + retcols=( + "id", + "action", + "status", + "timestamp", + "resource_id", + "params", + "result", + "error", + ), + allow_none=True, + desc="get_scheduled_task", + ) + + return TaskSchedulerWorkerStore._convert_row_to_task(row) if row else None + + async def delete_scheduled_task(self, id: str) -> None: + """Delete a specific task from its id. + + Args: + id: the id of the task to delete + """ + await self.db_pool.simple_delete( + "scheduled_tasks", + keyvalues={"id": id}, + desc="delete_scheduled_task", + ) diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 7de9949a5b..649d3c8e9f 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -113,6 +113,7 @@ Changes in SCHEMA_VERSION = 79 Changes in SCHEMA_VERSION = 80 - The event_txn_id_device_id is always written to for new events. + - Add tables for the task scheduler. """ diff --git a/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql new file mode 100644 index 0000000000..286d109ed7 --- /dev/null +++ b/synapse/storage/schema/main/delta/80/02_scheduled_tasks.sql @@ -0,0 +1,28 @@ +/* Copyright 2023 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- cf ScheduledTask docstring for the meaning of the fields. +CREATE TABLE IF NOT EXISTS scheduled_tasks( + id TEXT PRIMARY KEY, + action TEXT NOT NULL, + status TEXT NOT NULL, + timestamp BIGINT NOT NULL, + resource_id TEXT, + params TEXT, + result TEXT, + error TEXT +); + +CREATE INDEX IF NOT EXISTS scheduled_tasks_status ON scheduled_tasks(status); diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index 073f682aca..e750417189 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -15,6 +15,7 @@ import abc import re import string +from enum import Enum from typing import ( TYPE_CHECKING, AbstractSet, @@ -969,3 +970,41 @@ class UserProfile(TypedDict): class RetentionPolicy: min_lifetime: Optional[int] = None max_lifetime: Optional[int] = None + + +class TaskStatus(str, Enum): + """Status of a scheduled task""" + + # Task is scheduled but not active + SCHEDULED = "scheduled" + # Task is active and probably running, and if not + # will be run on next scheduler loop run + ACTIVE = "active" + # Task has completed successfully + COMPLETE = "complete" + # Task is over and either returned a failed status, or had an exception + FAILED = "failed" + + +@attr.s(auto_attribs=True, frozen=True, slots=True) +class ScheduledTask: + """Description of a scheduled task""" + + # Id used to identify the task + id: str + # Name of the action to be run by this task + action: str + # Current status of this task + status: TaskStatus + # If the status is SCHEDULED then this represents when it should be launched, + # otherwise it represents the last time this task got a change of state. + # In milliseconds since epoch in system time timezone, usually UTC. + timestamp: int + # Optionally bind a task to some resource id for easy retrieval + resource_id: Optional[str] + # Optional parameters that will be passed to the function ran by the task + params: Optional[JsonMapping] + # Optional result that can be updated by the running task + result: Optional[JsonMapping] + # Optional error that should be assigned a value when the status is FAILED + error: Optional[str] diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py new file mode 100644 index 0000000000..773a8327f6 --- /dev/null +++ b/synapse/util/task_scheduler.py @@ -0,0 +1,364 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Set, Tuple + +from prometheus_client import Gauge + +from twisted.python.failure import Failure + +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util.stringutils import random_string + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +running_tasks_gauge = Gauge( + "synapse_scheduler_running_tasks", + "The number of concurrent running tasks handled by the TaskScheduler", +) + + +class TaskScheduler: + """ + This is a simple task sheduler aimed at resumable tasks: usually we use `run_in_background` + to launch a background task, or Twisted `deferLater` if we want to do so later on. + + The problem with that is that the tasks will just stop and never be resumed if synapse + is stopped for whatever reason. + + How this works: + - A function mapped to a named action should first be registered with `register_action`. + This function will be called when trying to resuming tasks after a synapse shutdown, + so this registration should happen when synapse is initialised, NOT right before scheduling + a task. + - A task can then be launched using this named action with `schedule_task`. A `params` dict + can be passed, and it will be available to the registered function when launched. This task + can be launch either now-ish, or later on by giving a `timestamp` parameter. + + The function may call `update_task` at any time to update the `result` of the task, + and this can be used to resume the task at a specific point and/or to convey a result to + the code launching the task. + You can also specify the `result` (and/or an `error`) when returning from the function. + + The reconciliation loop runs every 5 mns, so this is not a precise scheduler. When wanting + to launch now, the launch will still not happen before the next loop run. + + Tasks will be run on the worker specified with `run_background_tasks_on` config, + or the main one by default. + There is a limit of 10 concurrent tasks, so tasks may be delayed if the pool is already + full. In this regard, please take great care that scheduled tasks can actually finished. + For now there is no mechanism to stop a running task if it is stuck. + """ + + # Precision of the scheduler, evaluation of tasks to run will only happen + # every `SCHEDULE_INTERVAL_MS` ms + SCHEDULE_INTERVAL_MS = 1 * 60 * 1000 # 1mn + # Time before a complete or failed task is deleted from the DB + KEEP_TASKS_FOR_MS = 7 * 24 * 60 * 60 * 1000 # 1 week + # Maximum number of tasks that can run at the same time + MAX_CONCURRENT_RUNNING_TASKS = 10 + # Time from the last task update after which we will log a warning + LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs + + def __init__(self, hs: "HomeServer"): + self._store = hs.get_datastores().main + self._clock = hs.get_clock() + self._running_tasks: Set[str] = set() + # A map between action names and their registered function + self._actions: Dict[ + str, + Callable[ + [ScheduledTask, bool], + Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], + ], + ] = {} + self._run_background_tasks = hs.config.worker.run_background_tasks + + if self._run_background_tasks: + self._clock.looping_call( + run_as_background_process, + TaskScheduler.SCHEDULE_INTERVAL_MS, + "handle_scheduled_tasks", + self._handle_scheduled_tasks, + ) + + def register_action( + self, + function: Callable[ + [ScheduledTask, bool], + Awaitable[Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]], + ], + action_name: str, + ) -> None: + """Register a function to be executed when an action is scheduled with + the specified action name. + + Actions need to be registered as early as possible so that a resumed action + can find its matching function. It's usually better to NOT do that right before + calling `schedule_task` but rather in an `__init__` method. + + Args: + function: The function to be executed for this action. The parameters + passed to the function when launched are the `ScheduledTask` being run, + and a `first_launch` boolean to signal if it's a resumed task or the first + launch of it. The function should return a tuple of new `status`, `result` + and `error` as specified in `ScheduledTask`. + action_name: The name of the action to be associated with the function + """ + self._actions[action_name] = function + + async def schedule_task( + self, + action: str, + *, + resource_id: Optional[str] = None, + timestamp: Optional[int] = None, + params: Optional[JsonMapping] = None, + ) -> str: + """Schedule a new potentially resumable task. A function matching the specified + `action` should have been previously registered with `register_action`. + + Args: + action: the name of a previously registered action + resource_id: a task can be associated with a resource id to facilitate + getting all tasks associated with a specific resource + timestamp: if `None`, the task will be launched as soon as possible, otherwise it + will be launch as soon as possible after the `timestamp` value. + Note that this scheduler is not meant to be precise, and the scheduling + could be delayed if too many tasks are already running + params: a set of parameters that can be easily accessed from inside the + executed function + + Returns: + The id of the scheduled task + """ + if action not in self._actions: + raise Exception( + f"No function associated with action {action} of the scheduled task" + ) + + if timestamp is None or timestamp < self._clock.time_msec(): + timestamp = self._clock.time_msec() + + task = ScheduledTask( + random_string(16), + action, + TaskStatus.SCHEDULED, + timestamp, + resource_id, + params, + result=None, + error=None, + ) + await self._store.insert_scheduled_task(task) + + return task.id + + async def update_task( + self, + id: str, + *, + timestamp: Optional[int] = None, + status: Optional[TaskStatus] = None, + result: Optional[JsonMapping] = None, + error: Optional[str] = None, + ) -> bool: + """Update some task associated values. This is exposed publically so it can + be used inside task functions, mainly to update the result and be able to + resume a task at a specific step after a restart of synapse. + + It can also be used to stage a task, by setting the `status` to `SCHEDULED` with + a new timestamp. + + The `status` can only be set to `ACTIVE` or `SCHEDULED`, `COMPLETE` and `FAILED` + are terminal status and can only be set by returning it in the function. + + Args: + id: the id of the task to update + timestamp: useful to schedule a new stage of the task at a later date + status: the new `TaskStatus` of the task + result: the new result of the task + error: the new error of the task + """ + if status == TaskStatus.COMPLETE or status == TaskStatus.FAILED: + raise Exception( + "update_task can't be called with a FAILED or COMPLETE status" + ) + + if timestamp is None: + timestamp = self._clock.time_msec() + return await self._store.update_scheduled_task( + id, + timestamp, + status=status, + result=result, + error=error, + ) + + async def get_task(self, id: str) -> Optional[ScheduledTask]: + """Get a specific task description by id. + + Args: + id: the id of the task to retrieve + + Returns: + The task information or `None` if it doesn't exist or it has + already been removed because it's too old. + """ + return await self._store.get_scheduled_task(id) + + async def get_tasks( + self, + *, + actions: Optional[List[str]] = None, + resource_id: Optional[str] = None, + statuses: Optional[List[TaskStatus]] = None, + max_timestamp: Optional[int] = None, + ) -> List[ScheduledTask]: + """Get a list of tasks. Returns all the tasks if no args is provided. + + If an arg is `None` all tasks matching the other args will be selected. + If an arg is an empty list, the corresponding value of the task needs + to be `None` to be selected. + + Args: + actions: Limit the returned tasks to those specific action names + resource_id: Limit the returned tasks to the specific resource id, if specified + statuses: Limit the returned tasks to the specific statuses + max_timestamp: Limit the returned tasks to the ones that have + a timestamp inferior to the specified one + + Returns + A list of `ScheduledTask`, ordered by increasing timestamps + """ + return await self._store.get_scheduled_tasks( + actions=actions, + resource_id=resource_id, + statuses=statuses, + max_timestamp=max_timestamp, + ) + + async def delete_task(self, id: str) -> None: + """Delete a task. Running tasks can't be deleted. + + Can only be called from the worker handling the task scheduling. + + Args: + id: id of the task to delete + """ + if self.task_is_running(id): + raise Exception(f"Task {id} is currently running and can't be deleted") + await self._store.delete_scheduled_task(id) + + def task_is_running(self, id: str) -> bool: + """Check if a task is currently running. + + Can only be called from the worker handling the task scheduling. + + Args: + id: id of the task to check + """ + assert self._run_background_tasks + return id in self._running_tasks + + async def _handle_scheduled_tasks(self) -> None: + """Main loop taking care of launching tasks and cleaning up old ones.""" + await self._launch_scheduled_tasks() + await self._clean_scheduled_tasks() + + async def _launch_scheduled_tasks(self) -> None: + """Retrieve and launch scheduled tasks that should be running at that time.""" + for task in await self.get_tasks(statuses=[TaskStatus.ACTIVE]): + if not self.task_is_running(task.id): + if ( + len(self._running_tasks) + < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + ): + await self._launch_task(task, first_launch=False) + else: + if ( + self._clock.time_msec() + > task.timestamp + TaskScheduler.LAST_UPDATE_BEFORE_WARNING_MS + ): + logger.warn( + f"Task {task.id} (action {task.action}) has seen no update for more than 24h and may be stuck" + ) + for task in await self.get_tasks( + statuses=[TaskStatus.SCHEDULED], max_timestamp=self._clock.time_msec() + ): + if ( + not self.task_is_running(task.id) + and len(self._running_tasks) + < TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + ): + await self._launch_task(task, first_launch=True) + + running_tasks_gauge.set(len(self._running_tasks)) + + async def _clean_scheduled_tasks(self) -> None: + """Clean old complete or failed jobs to avoid clutter the DB.""" + for task in await self._store.get_scheduled_tasks( + statuses=[TaskStatus.FAILED, TaskStatus.COMPLETE] + ): + # FAILED and COMPLETE tasks should never be running + assert not self.task_is_running(task.id) + if ( + self._clock.time_msec() + > task.timestamp + TaskScheduler.KEEP_TASKS_FOR_MS + ): + await self._store.delete_scheduled_task(task.id) + + async def _launch_task(self, task: ScheduledTask, first_launch: bool) -> None: + """Launch a scheduled task now. + + Args: + task: the task to launch + first_launch: `True` if it's the first time is launched, `False` otherwise + """ + assert task.action in self._actions + + function = self._actions[task.action] + + async def wrapper() -> None: + try: + (status, result, error) = await function(task, first_launch) + except Exception: + f = Failure() + logger.error( + f"scheduled task {task.id} failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) + status = TaskStatus.FAILED + result = None + error = f.getErrorMessage() + + await self._store.update_scheduled_task( + task.id, + self._clock.time_msec(), + status=status, + result=result, + error=error, + ) + self._running_tasks.remove(task.id) + + self._running_tasks.add(task.id) + await self.update_task(task.id, status=TaskStatus.ACTIVE) + description = f"{task.id}-{task.action}" + run_as_background_process(description, wrapper) diff --git a/tests/util/test_task_scheduler.py b/tests/util/test_task_scheduler.py new file mode 100644 index 0000000000..3a97559bf0 --- /dev/null +++ b/tests/util/test_task_scheduler.py @@ -0,0 +1,186 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Optional, Tuple + +from twisted.internet.task import deferLater +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.types import JsonMapping, ScheduledTask, TaskStatus +from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler + +from tests import unittest + + +class TestTaskScheduler(unittest.HomeserverTestCase): + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.task_scheduler = hs.get_task_scheduler() + self.task_scheduler.register_action(self._test_task, "_test_task") + self.task_scheduler.register_action(self._sleeping_task, "_sleeping_task") + self.task_scheduler.register_action(self._raising_task, "_raising_task") + self.task_scheduler.register_action(self._resumable_task, "_resumable_task") + + async def _test_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + # This test task will copy the parameters to the result + result = None + if task.params: + result = task.params + return (TaskStatus.COMPLETE, result, None) + + def test_schedule_task(self) -> None: + """Schedule a task in the future with some parameters to be copied as a result and check it executed correctly. + Also check that it get removed after `KEEP_TASKS_FOR_MS`.""" + timestamp = self.clock.time_msec() + 30 * 1000 + task_id = self.get_success( + self.task_scheduler.schedule_task( + "_test_task", + timestamp=timestamp, + params={"val": 1}, + ) + ) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.SCHEDULED) + self.assertIsNone(task.result) + + # The timestamp being 30s after now the task should been executed + # after the first scheduling loop is run + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.COMPLETE) + assert task.result is not None + # The passed parameter should have been copied to the result + self.assertTrue(task.result.get("val") == 1) + + # Let's wait for the complete task to be deleted and hence unavailable + self.reactor.advance((TaskScheduler.KEEP_TASKS_FOR_MS / 1000) + 1) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + self.assertIsNone(task) + + async def _sleeping_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + # Sleep for a second + await deferLater(self.reactor, 1, lambda: None) + return TaskStatus.COMPLETE, None, None + + def test_schedule_lot_of_tasks(self) -> None: + """Schedule more than `TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS` tasks and check the behavior.""" + timestamp = self.clock.time_msec() + 30 * 1000 + task_ids = [] + for i in range(TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS + 1): + task_ids.append( + self.get_success( + self.task_scheduler.schedule_task( + "_sleeping_task", + timestamp=timestamp, + params={"val": i}, + ) + ) + ) + + # The timestamp being 30s after now the task should been executed + # after the first scheduling loop is run + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + + # This is to give the time to the sleeping tasks to finish + self.reactor.advance(1) + + # Check that only MAX_CONCURRENT_RUNNING_TASKS tasks has run and that one + # is still scheduled. + tasks = [ + self.get_success(self.task_scheduler.get_task(task_id)) + for task_id in task_ids + ] + + self.assertEquals( + len( + [t for t in tasks if t is not None and t.status == TaskStatus.COMPLETE] + ), + TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS, + ) + + scheduled_tasks = [ + t for t in tasks if t is not None and t.status == TaskStatus.SCHEDULED + ] + self.assertEquals(len(scheduled_tasks), 1) + + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + self.reactor.advance(1) + + # Check that the last task has been properly executed after the next scheduler loop run + prev_scheduled_task = self.get_success( + self.task_scheduler.get_task(scheduled_tasks[0].id) + ) + assert prev_scheduled_task is not None + self.assertEquals( + prev_scheduled_task.status, + TaskStatus.COMPLETE, + ) + + async def _raising_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + raise Exception("raising") + + def test_schedule_raising_task(self) -> None: + """Schedule a task raising an exception and check it runs to failure and report exception content.""" + task_id = self.get_success(self.task_scheduler.schedule_task("_raising_task")) + + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.FAILED) + self.assertEqual(task.error, "raising") + + async def _resumable_task( + self, task: ScheduledTask, first_launch: bool + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + if task.result and "in_progress" in task.result: + return TaskStatus.COMPLETE, {"success": True}, None + else: + await self.task_scheduler.update_task(task.id, result={"in_progress": True}) + # Await forever to simulate an aborted task because of a restart + await deferLater(self.reactor, 2**16, lambda: None) + # This should never been called + return TaskStatus.ACTIVE, None, None + + def test_schedule_resumable_task(self) -> None: + """Schedule a resumable task and check that it gets properly resumed and complete after simulating a synapse restart.""" + task_id = self.get_success(self.task_scheduler.schedule_task("_resumable_task")) + + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.ACTIVE) + + # Simulate a synapse restart by emptying the list of running tasks + self.task_scheduler._running_tasks = set() + self.reactor.advance((TaskScheduler.SCHEDULE_INTERVAL_MS / 1000)) + + task = self.get_success(self.task_scheduler.get_task(task_id)) + assert task is not None + self.assertEqual(task.status, TaskStatus.COMPLETE) + assert task.result is not None + self.assertTrue(task.result.get("success")) -- cgit 1.5.1 From 7dbac123f98a2d59d09a63efe4543ee850a8d630 Mon Sep 17 00:00:00 2001 From: Hugh Nimmo-Smith Date: Tue, 22 Aug 2023 12:42:08 +0100 Subject: Disallow user_consent where experimental MSC3861 is enabled (#16127) --- changelog.d/16127.bugfix | 1 + synapse/config/experimental.py | 7 +++++++ tests/config/test_oauth_delegation.py | 16 ++++++++++++++++ 3 files changed, 24 insertions(+) create mode 100644 changelog.d/16127.bugfix (limited to 'tests') diff --git a/changelog.d/16127.bugfix b/changelog.d/16127.bugfix new file mode 100644 index 0000000000..0308fdfd45 --- /dev/null +++ b/changelog.d/16127.bugfix @@ -0,0 +1 @@ +User consent features cannot be enabled when using experimental MSC3861. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index ac9449b18f..d4cf9a0555 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -173,6 +173,13 @@ class MSC3861: ("enable_registration",), ) + # We only need to test the user consent version, as if it must be set if the user_consent section was present in the config + if root.consent.user_consent_version is not None: + raise ConfigError( + "User consent cannot be enabled when OAuth delegation is enabled", + ("user_consent",), + ) + if ( root.oidc.oidc_enabled or root.saml2.saml2_enabled diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py index f57c813a58..35f7b85dc7 100644 --- a/tests/config/test_oauth_delegation.py +++ b/tests/config/test_oauth_delegation.py @@ -12,6 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import os from unittest.mock import Mock from synapse.config import ConfigError @@ -167,6 +168,21 @@ class MSC3861OAuthDelegation(TestCase): with self.assertRaises(ConfigError): self.parse_config() + def test_user_consent_cannot_be_enabled(self) -> None: + tmpdir = self.mktemp() + os.mkdir(tmpdir) + self.config_dict["user_consent"] = { + "require_at_registration": True, + "version": "1", + "template_dir": tmpdir, + "server_notice_content": { + "msgtype": "m.text", + "body": "foo", + }, + } + with self.assertRaises(ConfigError): + self.parse_config() + def test_password_config_cannot_be_enabled(self) -> None: self.config_dict["password_config"] = {"enabled": True} with self.assertRaises(ConfigError): -- cgit 1.5.1 From 6d7c63fcc6e4e8f5bb24f471c5308d4cf4acafab Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Aug 2023 07:46:32 -0400 Subject: Properly call setup_background_tasks in unit tests. (#16150) This should only be called on HomeServer objects which are configured to run background tasks, which is automatically (and properly) done via the call to setup(). --- changelog.d/16150.misc | 1 + tests/server.py | 2 -- 2 files changed, 1 insertion(+), 2 deletions(-) create mode 100644 changelog.d/16150.misc (limited to 'tests') diff --git a/changelog.d/16150.misc b/changelog.d/16150.misc new file mode 100644 index 0000000000..97861282fd --- /dev/null +++ b/changelog.d/16150.misc @@ -0,0 +1 @@ +Clean-up calling `setup_background_tasks` in unit tests. diff --git a/tests/server.py b/tests/server.py index 481fe34c5c..ff03d28864 100644 --- a/tests/server.py +++ b/tests/server.py @@ -1000,8 +1000,6 @@ def setup_test_homeserver( hs.tls_server_context_factory = Mock() hs.setup() - if homeserver_to_use == TestHomeServer: - hs.setup_background_tasks() if isinstance(db_engine, PostgresEngine): database_pool = hs.get_datastores().databases[0] -- cgit 1.5.1 From b657e89005bc48e5e061d63ae35e12bf23b81d88 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Aug 2023 09:08:24 -0400 Subject: Fix user directory test for deactivated support user. (#16157) Support users should not be added to the user directory after being deactivated. --- changelog.d/16157.misc | 1 + tests/handlers/test_user_directory.py | 19 +++++++++++-------- 2 files changed, 12 insertions(+), 8 deletions(-) create mode 100644 changelog.d/16157.misc (limited to 'tests') diff --git a/changelog.d/16157.misc b/changelog.d/16157.misc new file mode 100644 index 0000000000..c9d8999cca --- /dev/null +++ b/changelog.d/16157.misc @@ -0,0 +1 @@ +Fix assertion in user directory unit tests. diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 9785dd698b..430209705e 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -446,6 +446,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self.assertIsNone(profile) def test_handle_user_deactivated_support_user(self) -> None: + """Ensure a support user doesn't get added to the user directory after deactivation.""" s_user_id = "@support:test" self.get_success( self.store.register_user( @@ -453,14 +454,16 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): ) ) - mock_remove_from_user_dir = Mock(return_value=make_awaitable(None)) - with patch.object( - self.store, "remove_from_user_dir", mock_remove_from_user_dir - ): - self.get_success(self.handler.handle_local_user_deactivated(s_user_id)) - # BUG: the correct spelling is assert_not_called, but that makes the test fail - # and it's not clear that this is actually the behaviour we want. - mock_remove_from_user_dir.not_called() + # The profile should not be in the directory. + profile = self.get_success(self.store._get_user_in_directory(s_user_id)) + self.assertIsNone(profile) + + # Remove the user from the directory. + self.get_success(self.handler.handle_local_user_deactivated(s_user_id)) + + # The profile should still not be in the user directory. + profile = self.get_success(self.store._get_user_in_directory(s_user_id)) + self.assertIsNone(profile) def test_handle_user_deactivated_regular_user(self) -> None: r_user_id = "@regular:test" -- cgit 1.5.1 From 69048f7b4848ab6a4ae6cb233f8cbf36d73c0ba1 Mon Sep 17 00:00:00 2001 From: Shay Date: Tue, 22 Aug 2023 07:15:34 -0700 Subject: Add an admin endpoint to allow authorizing server to signal token revocations (#16125) --- changelog.d/16125.misc | 1 + synapse/api/auth/msc3861_delegated.py | 13 +++++ synapse/replication/tcp/client.py | 12 +++++ synapse/rest/admin/__init__.py | 3 ++ synapse/rest/admin/oidc.py | 55 +++++++++++++++++++ synapse/storage/databases/main/cache.py | 13 +++++ synapse/storage/databases/main/devices.py | 9 ++++ synapse/util/caches/expiringcache.py | 22 ++++++++ tests/handlers/test_oauth_delegation.py | 34 +++++++++++- tests/replication/test_intro_token_invalidation.py | 62 ++++++++++++++++++++++ 10 files changed, 223 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16125.misc create mode 100644 synapse/rest/admin/oidc.py create mode 100644 tests/replication/test_intro_token_invalidation.py (limited to 'tests') diff --git a/changelog.d/16125.misc b/changelog.d/16125.misc new file mode 100644 index 0000000000..2f1bf23108 --- /dev/null +++ b/changelog.d/16125.misc @@ -0,0 +1 @@ +Add an admin endpoint to allow authorizing server to signal token revocations. diff --git a/synapse/api/auth/msc3861_delegated.py b/synapse/api/auth/msc3861_delegated.py index 4bdfe31b22..14cba50c90 100644 --- a/synapse/api/auth/msc3861_delegated.py +++ b/synapse/api/auth/msc3861_delegated.py @@ -438,3 +438,16 @@ class MSC3861DelegatedAuth(BaseAuth): scope=scope, is_guest=(has_guest_scope and not has_user_scope), ) + + def invalidate_cached_tokens(self, keys: List[str]) -> None: + """ + Invalidate the entry(s) in the introspection token cache corresponding to the given key + """ + for key in keys: + self._token_cache.invalidate(key) + + def invalidate_token_cache(self) -> None: + """ + Invalidate the entire token cache. + """ + self._token_cache.invalidate_all() diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 139f57cf86..04e8cff6ea 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -26,6 +26,7 @@ from synapse.logging.context import PreserveLoggingContext, make_deferred_yielda from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.streams import ( AccountDataStream, + CachesStream, DeviceListsStream, PushersStream, PushRulesStream, @@ -73,6 +74,7 @@ class ReplicationDataHandler: self._instance_name = hs.get_instance_name() self._typing_handler = hs.get_typing_handler() self._state_storage_controller = hs.get_storage_controllers().state + self.auth = hs.get_auth() self._notify_pushers = hs.config.worker.start_pushers self._pusher_pool = hs.get_pusherpool() @@ -218,6 +220,16 @@ class ReplicationDataHandler: self._state_storage_controller.notify_event_un_partial_stated( row.event_id ) + # invalidate the introspection token cache + elif stream_name == CachesStream.NAME: + for row in rows: + if row.cache_func == "introspection_token_invalidation": + if row.keys[0] is None: + # invalidate the whole cache + # mypy ignore - the token cache is defined on MSC3861DelegatedAuth + self.auth.invalidate_token_cache() # type: ignore[attr-defined] + else: + self.auth.invalidate_cached_tokens(row.keys) # type: ignore[attr-defined] await self._presence_handler.process_replication_rows( stream_name, instance_name, token, rows diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py index fe8177ed4d..55e752fda8 100644 --- a/synapse/rest/admin/__init__.py +++ b/synapse/rest/admin/__init__.py @@ -47,6 +47,7 @@ from synapse.rest.admin.federation import ( ListDestinationsRestServlet, ) from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo +from synapse.rest.admin.oidc import OIDCTokenRevocationRestServlet from synapse.rest.admin.registration_tokens import ( ListRegistrationTokensRestServlet, NewRegistrationTokenRestServlet, @@ -297,6 +298,8 @@ def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None: BackgroundUpdateRestServlet(hs).register(http_server) BackgroundUpdateStartJobRestServlet(hs).register(http_server) ExperimentalFeaturesRestServlet(hs).register(http_server) + if hs.config.experimental.msc3861.enabled: + OIDCTokenRevocationRestServlet(hs).register(http_server) def register_servlets_for_client_rest_resource( diff --git a/synapse/rest/admin/oidc.py b/synapse/rest/admin/oidc.py new file mode 100644 index 0000000000..64d2d40550 --- /dev/null +++ b/synapse/rest/admin/oidc.py @@ -0,0 +1,55 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from http import HTTPStatus +from typing import TYPE_CHECKING, Dict, Tuple + +from synapse.http.servlet import RestServlet +from synapse.http.site import SynapseRequest +from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +class OIDCTokenRevocationRestServlet(RestServlet): + """ + Delete a given token introspection response - identified by the `jti` field - from the + introspection token cache when a token is revoked at the authorizing server + """ + + PATTERNS = admin_patterns("/OIDC_token_revocation/(?P[^/]*)") + + def __init__(self, hs: "HomeServer"): + super().__init__() + auth = hs.get_auth() + + # If this endpoint is loaded then we must have enabled delegated auth. + from synapse.api.auth.msc3861_delegated import MSC3861DelegatedAuth + + assert isinstance(auth, MSC3861DelegatedAuth) + + self.auth = auth + self.store = hs.get_datastores().main + + async def on_DELETE( + self, request: SynapseRequest, token_id: str + ) -> Tuple[HTTPStatus, Dict]: + await assert_requester_is_admin(self.auth, request) + + self.auth._token_cache.invalidate(token_id) + + # make sure we invalidate the cache on any workers + await self.store.stream_introspection_token_invalidation((token_id,)) + + return HTTPStatus.OK, {} diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2fbd389c71..18905e07b6 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -584,6 +584,19 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: return 0 + async def stream_introspection_token_invalidation( + self, key: Tuple[Optional[str]] + ) -> None: + """ + Stream an invalidation request for the introspection token cache to workers + + Args: + key: token_id of the introspection token to remove from the cache + """ + await self.send_invalidation_to_replication( + "introspection_token_invalidation", key + ) + @wrap_as_background_process("clean_up_old_cache_invalidations") async def _clean_up_cache_invalidation_wrapper(self) -> None: """ diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index e4162f846b..fa69a4a298 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -33,6 +33,7 @@ from typing_extensions import Literal from synapse.api.constants import EduTypes from synapse.api.errors import Codes, StoreError +from synapse.config.homeserver import HomeServerConfig from synapse.logging.opentracing import ( get_active_span_text_map, set_tag, @@ -1663,6 +1664,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): self.device_id_exists_cache: LruCache[ Tuple[str, str], Literal[True] ] = LruCache(cache_name="device_id_exists", max_size=10000) + self.config: HomeServerConfig = hs.config async def store_device( self, @@ -1784,6 +1786,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): for device_id in device_ids: self.device_id_exists_cache.invalidate((user_id, device_id)) + # TODO: don't nuke the entire cache once there is a way to associate + # device_id -> introspection_token + if self.config.experimental.msc3861.enabled: + # mypy ignore - the token cache is defined on MSC3861DelegatedAuth + self.auth._token_cache.invalidate_all() # type: ignore[attr-defined] + await self.stream_introspection_token_invalidation((None,)) + async def update_device( self, user_id: str, device_id: str, new_display_name: Optional[str] = None ) -> None: diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 01ad02af67..9a3e10ddee 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -140,6 +140,20 @@ class ExpiringCache(Generic[KT, VT]): return value.value + def invalidate(self, key: KT) -> None: + """ + Remove the given key from the cache. + """ + + value = self._cache.pop(key, None) + if value: + if self.iterable: + self.metrics.inc_evictions( + EvictionReason.invalidation, len(value.value) + ) + else: + self.metrics.inc_evictions(EvictionReason.invalidation) + def __contains__(self, key: KT) -> bool: return key in self._cache @@ -193,6 +207,14 @@ class ExpiringCache(Generic[KT, VT]): len(self), ) + def invalidate_all(self) -> None: + """ + Remove all items from the cache. + """ + keys = set(self._cache.keys()) + for key in keys: + self._cache.pop(key) + def __len__(self) -> int: if self.iterable: return sum(len(entry.value) for entry in self._cache.values()) diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index 1456b675a7..b891e84690 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -14,7 +14,7 @@ from http import HTTPStatus from typing import Any, Dict, Union -from unittest.mock import ANY, Mock +from unittest.mock import ANY, AsyncMock, Mock from urllib.parse import parse_qs from signedjson.key import ( @@ -588,6 +588,38 @@ class MSC3861OAuthDelegation(HomeserverTestCase): ) self.assertEqual(self.http_client.request.call_count, 2) + def test_revocation_endpoint(self) -> None: + # mock introspection response and then admin verification response + self.http_client.request = AsyncMock( + side_effect=[ + FakeResponse.json( + code=200, payload={"active": True, "jti": "open_sesame"} + ), + FakeResponse.json( + code=200, + payload={ + "active": True, + "sub": SUBJECT, + "scope": " ".join([SYNAPSE_ADMIN_SCOPE, MATRIX_USER_SCOPE]), + "username": USERNAME, + }, + ), + ] + ) + + # cache a token to delete + introspection_token = self.get_success( + self.auth._introspect_token("open_sesame") # type: ignore[attr-defined] + ) + self.assertEqual(self.auth._token_cache.get("open_sesame"), introspection_token) # type: ignore[attr-defined] + + # delete the revoked token + introspection_token_id = "open_sesame" + url = f"/_synapse/admin/v1/OIDC_token_revocation/{introspection_token_id}" + channel = self.make_request("DELETE", url, access_token="mockAccessToken") + self.assertEqual(channel.code, 200) + self.assertEqual(self.auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined] + def make_device_keys(self, user_id: str, device_id: str) -> JsonDict: # We only generate a master key to simplify the test. master_signing_key = generate_signing_key(device_id) diff --git a/tests/replication/test_intro_token_invalidation.py b/tests/replication/test_intro_token_invalidation.py new file mode 100644 index 0000000000..f90678b6b1 --- /dev/null +++ b/tests/replication/test_intro_token_invalidation.py @@ -0,0 +1,62 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict + +import synapse.rest.admin._base + +from tests.replication._base import BaseMultiWorkerStreamTestCase + + +class IntrospectionTokenCacheInvalidationTestCase(BaseMultiWorkerStreamTestCase): + servlets = [synapse.rest.admin.register_servlets] + + def default_config(self) -> Dict[str, Any]: + config = super().default_config() + config["disable_registration"] = True + config["experimental_features"] = { + "msc3861": { + "enabled": True, + "issuer": "some_dude", + "client_id": "ID", + "client_auth_method": "client_secret_post", + "client_secret": "secret", + } + } + return config + + def test_stream_introspection_token_invalidation(self) -> None: + worker_hs = self.make_worker_hs("synapse.app.generic_worker") + auth = worker_hs.get_auth() + store = self.hs.get_datastores().main + + # add a token to the cache on the worker + auth._token_cache["open_sesame"] = "intro_token" # type: ignore[attr-defined] + + # stream the invalidation from the master + self.get_success( + store.stream_introspection_token_invalidation(("open_sesame",)) + ) + + # check that the cache on the worker was invalidated + self.assertEqual(auth._token_cache.get("open_sesame"), None) # type: ignore[attr-defined] + + # test invalidating whole cache + for i in range(0, 5): + auth._token_cache[f"open_sesame_{i}"] = f"intro_token_{i}" # type: ignore[attr-defined] + self.assertEqual(len(auth._token_cache), 5) # type: ignore[attr-defined] + + self.get_success(store.stream_introspection_token_invalidation((None,))) + + self.assertEqual(len(auth._token_cache), 0) # type: ignore[attr-defined] -- cgit 1.5.1 From 0ba17777be81ba9457defb407112b664042a14d2 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Tue, 22 Aug 2023 16:47:59 +0200 Subject: Disable `m.3pid_changes` capability when MSC3861 is enabled. (#16134) --- changelog.d/16127.bugfix | 2 +- changelog.d/16134.bugfix | 1 + synapse/config/experimental.py | 6 ++++++ synapse/config/registration.py | 11 ++++++++++- tests/config/test_oauth_delegation.py | 5 +++++ 5 files changed, 23 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16134.bugfix (limited to 'tests') diff --git a/changelog.d/16127.bugfix b/changelog.d/16127.bugfix index 0308fdfd45..9ce5f4a705 100644 --- a/changelog.d/16127.bugfix +++ b/changelog.d/16127.bugfix @@ -1 +1 @@ -User consent features cannot be enabled when using experimental MSC3861. +User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support. diff --git a/changelog.d/16134.bugfix b/changelog.d/16134.bugfix new file mode 100644 index 0000000000..9ce5f4a705 --- /dev/null +++ b/changelog.d/16134.bugfix @@ -0,0 +1 @@ +User constent and 3-PID changes capability cannot be enabled when using experimental [MSC3861](https://github.com/matrix-org/matrix-spec-proposals/pull/3861) support. diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index d4cf9a0555..277ea4675b 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -223,6 +223,12 @@ class MSC3861: ("session_lifetime",), ) + if root.registration.enable_3pid_changes: + raise ConfigError( + "enable_3pid_changes cannot be enabled when OAuth delegation is enabled", + ("enable_3pid_changes",), + ) + @attr.s(auto_attribs=True, frozen=True, slots=True) class MSC3866Config: diff --git a/synapse/config/registration.py b/synapse/config/registration.py index df1d83dfaa..b8ad6fbc06 100644 --- a/synapse/config/registration.py +++ b/synapse/config/registration.py @@ -133,7 +133,16 @@ class RegistrationConfig(Config): self.enable_set_displayname = config.get("enable_set_displayname", True) self.enable_set_avatar_url = config.get("enable_set_avatar_url", True) - self.enable_3pid_changes = config.get("enable_3pid_changes", True) + + # The default value of enable_3pid_changes is True, unless msc3861 is enabled. + msc3861_enabled = ( + (config.get("experimental_features") or {}) + .get("msc3861", {}) + .get("enabled", False) + ) + self.enable_3pid_changes = config.get( + "enable_3pid_changes", not msc3861_enabled + ) self.disable_msisdn_registration = config.get( "disable_msisdn_registration", False diff --git a/tests/config/test_oauth_delegation.py b/tests/config/test_oauth_delegation.py index 35f7b85dc7..5c91031746 100644 --- a/tests/config/test_oauth_delegation.py +++ b/tests/config/test_oauth_delegation.py @@ -271,3 +271,8 @@ class MSC3861OAuthDelegation(TestCase): self.config_dict["session_lifetime"] = "24h" with self.assertRaises(ConfigError): self.parse_config() + + def test_enable_3pid_changes_cannot_be_enabled(self) -> None: + self.config_dict["enable_3pid_changes"] = True + with self.assertRaises(ConfigError): + self.parse_config() -- cgit 1.5.1 From 3f17178728fa4029c2504f4ceb7377dc888512ab Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 22 Aug 2023 11:43:44 -0400 Subject: Clean-up presence tests (#16158) Reduce duplicated code & remove unused variables. --- changelog.d/16158.misc | 1 + tests/handlers/test_presence.py | 129 ++++++++++++---------------------------- 2 files changed, 38 insertions(+), 92 deletions(-) create mode 100644 changelog.d/16158.misc (limited to 'tests') diff --git a/changelog.d/16158.misc b/changelog.d/16158.misc new file mode 100644 index 0000000000..41059378c5 --- /dev/null +++ b/changelog.d/16158.misc @@ -0,0 +1 @@ +Improve presence tests. diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index fd66d573d2..1f483eb75a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -514,6 +514,9 @@ class PresenceTimeoutTestCase(unittest.TestCase): class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): + user_id = "@test:server" + user_id_obj = UserID.from_string(user_id) + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.presence_handler = hs.get_presence_handler() self.clock = hs.get_clock() @@ -523,12 +526,11 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): we time out their syncing users presence. """ process_id = "1" - user_id = "@test:server" # Notify handler that a user is now syncing. self.get_success( self.presence_handler.update_external_syncs_row( - process_id, user_id, True, self.clock.time_msec() + process_id, self.user_id, True, self.clock.time_msec() ) ) @@ -536,48 +538,37 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # stopped syncing that their presence state doesn't get timed out. self.reactor.advance(EXTERNAL_PROCESS_EXPIRY / 2) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) self.assertEqual(state.state, PresenceState.ONLINE) # Check that if the external process timeout fires, then the syncing # user gets timed out self.reactor.advance(EXTERNAL_PROCESS_EXPIRY) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) self.assertEqual(state.state, PresenceState.OFFLINE) def test_user_goes_offline_by_timeout_status_msg_remain(self) -> None: """Test that if a user doesn't update the records for a while users presence goes `OFFLINE` because of timeout and `status_msg` remains. """ - user_id = "@test:server" status_msg = "I'm here!" # Mark user as online - self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg) # Check that if we wait a while without telling the handler the user has # stopped syncing that their presence state doesn't get timed out. self.reactor.advance(SYNC_ONLINE_TIMEOUT / 2) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) self.assertEqual(state.state, PresenceState.ONLINE) self.assertEqual(state.status_msg, status_msg) # Check that if the timeout fires, then the syncing user gets timed out self.reactor.advance(SYNC_ONLINE_TIMEOUT) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # status_msg should remain even after going offline self.assertEqual(state.state, PresenceState.OFFLINE) self.assertEqual(state.status_msg, status_msg) @@ -586,24 +577,19 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): """Test that if a user change presence manually to `OFFLINE` and no status is set, that `status_msg` is `None`. """ - user_id = "@test:server" status_msg = "I'm here!" # Mark user as online - self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg) # Mark user as offline self.get_success( self.presence_handler.set_state( - UserID.from_string(user_id), {"presence": PresenceState.OFFLINE} + self.user_id_obj, {"presence": PresenceState.OFFLINE} ) ) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) self.assertEqual(state.state, PresenceState.OFFLINE) self.assertEqual(state.status_msg, None) @@ -611,41 +597,31 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): """Test that if a user change presence manually to `OFFLINE` and a status is set, that `status_msg` appears. """ - user_id = "@test:server" status_msg = "I'm here!" # Mark user as online - self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg) # Mark user as offline - self._set_presencestate_with_status_msg( - user_id, PresenceState.OFFLINE, "And now here." - ) + self._set_presencestate_with_status_msg(PresenceState.OFFLINE, "And now here.") def test_user_reset_online_with_no_status(self) -> None: """Test that if a user set again the presence manually and no status is set, that `status_msg` is `None`. """ - user_id = "@test:server" status_msg = "I'm here!" # Mark user as online - self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg) # Mark user as online again self.get_success( self.presence_handler.set_state( - UserID.from_string(user_id), {"presence": PresenceState.ONLINE} + self.user_id_obj, {"presence": PresenceState.ONLINE} ) ) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # status_msg should remain even after going offline self.assertEqual(state.state, PresenceState.ONLINE) self.assertEqual(state.status_msg, None) @@ -654,33 +630,27 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): """Test that if a user set again the presence manually and status is `None`, that `status_msg` is `None`. """ - user_id = "@test:server" status_msg = "I'm here!" # Mark user as online - self._set_presencestate_with_status_msg( - user_id, PresenceState.ONLINE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, status_msg) # Mark user as online and `status_msg = None` - self._set_presencestate_with_status_msg(user_id, PresenceState.ONLINE, None) + self._set_presencestate_with_status_msg(PresenceState.ONLINE, None) def test_set_presence_from_syncing_not_set(self) -> None: """Test that presence is not set by syncing if affect_presence is false""" - user_id = "@test:server" status_msg = "I'm here!" - self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(user_id, False, PresenceState.ONLINE) + self.presence_handler.user_syncing( + self.user_id, False, PresenceState.ONLINE + ) ) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # we should still be unavailable self.assertEqual(state.state, PresenceState.UNAVAILABLE) # and status message should still be the same @@ -688,50 +658,34 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): def test_set_presence_from_syncing_is_set(self) -> None: """Test that presence is set by syncing if affect_presence is true""" - user_id = "@test:server" status_msg = "I'm here!" - self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) ) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # we should now be online self.assertEqual(state.state, PresenceState.ONLINE) def test_set_presence_from_syncing_keeps_status(self) -> None: """Test that presence set by syncing retains status message""" - user_id = "@test:server" status_msg = "I'm here!" - self._set_presencestate_with_status_msg( - user_id, PresenceState.UNAVAILABLE, status_msg - ) + self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) ) - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # our status message should be the same as it was before self.assertEqual(state.status_msg, status_msg) @parameterized.expand([(False,), (True,)]) - @unittest.override_config( - { - "experimental_features": { - "msc3026_enabled": True, - }, - } - ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) def test_set_presence_from_syncing_keeps_busy( self, test_with_workers: bool ) -> None: @@ -741,7 +695,6 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): test_with_workers: If True, check the presence state of the user by calling /sync against a worker, rather than the main process. """ - user_id = "@test:server" status_msg = "I'm busy!" # By default, we call /sync against the main process. @@ -755,44 +708,39 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): ) # Set presence to BUSY - self._set_presencestate_with_status_msg(user_id, PresenceState.BUSY, status_msg) + self._set_presencestate_with_status_msg(PresenceState.BUSY, status_msg) # Perform a sync with a presence state other than busy. This should NOT change # our presence status; we only change from busy if we explicitly set it via # /presence/*. self.get_success( worker_to_sync_against.get_presence_handler().user_syncing( - user_id, True, PresenceState.ONLINE + self.user_id, True, PresenceState.ONLINE ) ) # Check against the main process that the user's presence did not change. - state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) # we should still be busy self.assertEqual(state.state, PresenceState.BUSY) def _set_presencestate_with_status_msg( - self, user_id: str, state: str, status_msg: Optional[str] + self, state: str, status_msg: Optional[str] ) -> None: """Set a PresenceState and status_msg and check the result. Args: - user_id: User for that the status is to be set. state: The new PresenceState. status_msg: Status message that is to be set. """ self.get_success( self.presence_handler.set_state( - UserID.from_string(user_id), + self.user_id_obj, {"presence": state, "status_msg": status_msg}, ) ) - new_state = self.get_success( - self.presence_handler.get_state(UserID.from_string(user_id)) - ) + new_state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) self.assertEqual(new_state.state, state) self.assertEqual(new_state.status_msg, status_msg) @@ -952,9 +900,6 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): self.assertEqual(upto_token, now_token) self.assertFalse(limited) - expected_rows = [ - (2, ("dest3", "@user3:test")), - ] self.assertCountEqual(rows, []) prev_token = self.queue.get_current_token(self.instance_name) -- cgit 1.5.1 From 19a1cda084342034cc92c88c0376cbcadbf8e2a0 Mon Sep 17 00:00:00 2001 From: "DeepBlueV7.X" Date: Wed, 23 Aug 2023 08:35:23 +0000 Subject: Properly update retry_last_ts when hitting the maximum retry interval (#16156) * Properly update retry_last_ts when hitting the maximum retry interval This was broken in 1.87 when the maximum retry interval got changed from almost infinite to a week (and made configurable). fixes #16101 Signed-off-by: Nicolas Werner * Add changelog * Change fix + add test * Add comment --------- Signed-off-by: Nicolas Werner Co-authored-by: Mathieu Velten --- changelog.d/16156.bugfix | 1 + synapse/storage/databases/main/transactions.py | 4 +- tests/util/test_retryutils.py | 51 ++++++++++++++++++++++++++ 3 files changed, 55 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16156.bugfix (limited to 'tests') diff --git a/changelog.d/16156.bugfix b/changelog.d/16156.bugfix new file mode 100644 index 0000000000..17284297cf --- /dev/null +++ b/changelog.d/16156.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in 1.87 where synapse would send an excessive amount of federation requests to servers which have been offline for a long time. Contributed by Nico. diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index c3bd36efc9..48e4b0ba3c 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -242,6 +242,8 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): ) -> None: # Upsert retry time interval if retry_interval is zero (i.e. we're # resetting it) or greater than the existing retry interval. + # We also upsert when the new retry interval is the same as the existing one, + # since it will be the case when `destination_max_retry_interval` is reached. # # WARNING: This is executed in autocommit, so we shouldn't add any more # SQL calls in here (without being very careful). @@ -257,7 +259,7 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): WHERE EXCLUDED.retry_interval = 0 OR destinations.retry_interval IS NULL - OR destinations.retry_interval < EXCLUDED.retry_interval + OR destinations.retry_interval <= EXCLUDED.retry_interval """ txn.execute(sql, (destination, failure_ts, retry_last_ts, retry_interval)) diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py index 1277e1a865..4bcd17a6fc 100644 --- a/tests/util/test_retryutils.py +++ b/tests/util/test_retryutils.py @@ -108,3 +108,54 @@ class RetryLimiterTestCase(HomeserverTestCase): new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) self.assertIsNone(new_timings) + + def test_max_retry_interval(self) -> None: + """Test that `destination_max_retry_interval` setting works as expected""" + store = self.hs.get_datastores().main + + destination_max_retry_interval_ms = ( + self.hs.config.federation.destination_max_retry_interval_ms + ) + + self.get_success(get_retry_limiter("test_dest", self.clock, store)) + self.pump(1) + + failure_ts = self.clock.time_msec() + + # Simulate reaching destination_max_retry_interval + self.get_success( + store.set_destination_retry_timings( + "test_dest", + failure_ts=failure_ts, + retry_last_ts=failure_ts, + retry_interval=destination_max_retry_interval_ms, + ) + ) + + # Check it fails + self.get_failure( + get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination + ) + + # Get past retry_interval and we can try again, and still throw an error to continue the backoff + self.reactor.advance(destination_max_retry_interval_ms / 1000 + 1) + limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store)) + self.pump(1) + try: + with limiter: + self.pump(1) + raise AssertionError("argh") + except AssertionError: + pass + + self.pump() + + # retry_interval does not increase and stays at destination_max_retry_interval_ms + new_timings = self.get_success(store.get_destination_retry_timings("test_dest")) + assert new_timings is not None + self.assertEqual(new_timings.retry_interval, destination_max_retry_interval_ms) + + # Check it fails + self.get_failure( + get_retry_limiter("test_dest", self.clock, store), NotRetryingDestination + ) -- cgit 1.5.1