# Copyright 2021 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from typing import ( Dict, Iterable, List, Mapping, NoReturn, Optional, Sequence, Tuple, Union, ) from unittest import mock from typing_extensions import Literal from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import ( EventContentFields, EventTypes, JoinRules, Membership, RestrictedJoinRuleTypes, RoomTypes, ) from synapse.api.room_versions import RoomVersions from synapse.handlers.space_hierarchy import SpaceHierarchyHandler from synapse.rest.client import login, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock from tests import unittest class RemoveSpaceMemberTestCase(unittest.HomeserverTestCase): """Tests removal of a user from a space.""" servlets = [ synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): self.store = hs.get_datastore() # Create users self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.space_owner_user = self.register_user("space_owner", "pass") self.space_owner_user_tok = self.login("space_owner", "pass") self.target_user = self.register_user("user", "pass") self.target_user_tok = self.login("user", "pass") # Create a space hierarchy for testing: # space, invite-only # * subspace, restricted self.space_id = self._create_space(JoinRules.INVITE) # Make the target user a member of the space self.helper.invite( self.space_id, src=self.space_owner_user, targ=self.target_user, tok=self.space_owner_user_tok, ) self.helper.join(self.space_id, self.target_user, tok=self.target_user_tok) self.subspace_id = self._create_space((JoinRules.RESTRICTED, self.space_id)) self._add_child(self.space_id, self.subspace_id) def _add_child( self, space_id: str, room_id: str, via: Optional[List[str]] = None ) -> None: """Adds a room to a space.""" if via is None: via = [self.hs.hostname] self.helper.send_state( space_id, event_type=EventTypes.SpaceChild, body={"via": via}, tok=self.space_owner_user_tok, state_key=room_id, ) def _create_space( self, join_rules: Union[ Literal["public", "invite", "knock"], Tuple[Literal["restricted"], str], ], ) -> str: """Creates a space.""" return self._create_room( join_rules, extra_content={ "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE} }, ) def _create_room( self, join_rules: Union[ Literal["public", "invite", "knock"], Tuple[Literal["restricted"], str], ], extra_content: Optional[Dict] = None, ) -> str: """Creates a room.""" room_id = self.helper.create_room_as( self.space_owner_user, room_version=RoomVersions.V8.identifier, tok=self.space_owner_user_tok, extra_content=extra_content, ) if isinstance(join_rules, str): self.helper.send_state( room_id, event_type=EventTypes.JoinRules, body={"join_rule": join_rules}, tok=self.space_owner_user_tok, ) else: _, space_id = join_rules self.helper.send_state( room_id, event_type=EventTypes.JoinRules, body={ "join_rule": JoinRules.RESTRICTED, "allow": [ { "type": RestrictedJoinRuleTypes.ROOM_MEMBERSHIP, "room_id": space_id, "via": [self.hs.hostname], } ], }, tok=self.space_owner_user_tok, ) return room_id def _remove_from_space( self, user_id: str, space_id: Optional[str] = None, include_remote_spaces: Optional[bool] = None, ) -> JsonDict: """Removes the given user from the test space.""" if space_id is None: space_id = self.space_id content: Union[bytes, JsonDict] = b"" if include_remote_spaces is not None: content = {"include_remote_spaces": include_remote_spaces} url = f"/_synapse/admin/v1/rooms/{self.space_id}/hierarchy/members/{user_id}" channel = self.make_request( "DELETE", url.encode("ascii"), access_token=self.admin_user_tok, content=content, ) self.assertEqual(200, channel.code, channel.json_body) return channel.json_body def test_public_space(self) -> None: """Tests that the user is removed from the space, even if public.""" self.helper.send_state( self.space_id, event_type=EventTypes.JoinRules, body={"join_rule": JoinRules.PUBLIC}, tok=self.space_owner_user_tok, ) response = self._remove_from_space(self.target_user) self.assertEqual( response, { "left_rooms": [self.space_id], "inaccessible_rooms": [], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, self.space_id ) ) self.assertEqual(membership, Membership.LEAVE) def test_public_room(self) -> None: """Tests that the user is not removed from public rooms.""" public_room_id = self._create_room(JoinRules.PUBLIC) self._add_child(self.subspace_id, public_room_id) self.helper.join(public_room_id, self.target_user, tok=self.target_user_tok) response = self._remove_from_space(self.target_user) self.assertEqual( response, { "left_rooms": [self.space_id], "inaccessible_rooms": [], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, public_room_id ) ) self.assertEqual(membership, Membership.JOIN) def test_invited(self) -> None: """Tests that the user is made to decline invites to rooms in the space.""" invite_only_room_id = self._create_room(JoinRules.INVITE) self._add_child(self.subspace_id, invite_only_room_id) self.helper.invite( invite_only_room_id, src=self.space_owner_user, targ=self.target_user, tok=self.space_owner_user_tok, ) response = self._remove_from_space(self.target_user) self.assertEqual( response, { "left_rooms": [self.space_id, invite_only_room_id], "inaccessible_rooms": [], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, invite_only_room_id ) ) self.assertEqual(membership, Membership.LEAVE) def test_invite_only_room(self) -> None: """Tests that the user is made to leave invite-only rooms.""" invite_only_room_id = self._create_room(JoinRules.INVITE) self._add_child(self.subspace_id, invite_only_room_id) self.helper.invite( invite_only_room_id, src=self.space_owner_user, targ=self.target_user, tok=self.space_owner_user_tok, ) self.helper.join( invite_only_room_id, self.target_user, tok=self.target_user_tok ) response = self._remove_from_space(self.target_user) self.assertEqual( response, { "left_rooms": [self.space_id, invite_only_room_id], "inaccessible_rooms": [], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, invite_only_room_id ) ) self.assertEqual(membership, Membership.LEAVE) def test_restricted_room(self) -> None: """Tests that the user is made to leave restricted rooms.""" restricted_room_id = self._create_room((JoinRules.RESTRICTED, self.space_id)) self._add_child(self.subspace_id, restricted_room_id) self.helper.join(restricted_room_id, self.target_user, tok=self.target_user_tok) response = self._remove_from_space(self.target_user) self.assertEqual( response, { "left_rooms": [self.space_id, restricted_room_id], "inaccessible_rooms": [], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, restricted_room_id ) ) self.assertEqual(membership, Membership.LEAVE) def test_remote_space(self) -> None: """Tests that the user is made to leave rooms in a remote space.""" remote_space_id = "!space:remote" self._add_child(self.subspace_id, remote_space_id, via=["remote"]) restricted_room_id = self._create_room((JoinRules.RESTRICTED, self.space_id)) self.helper.join(restricted_room_id, self.target_user, tok=self.target_user_tok) async def _get_space_children_remote( _self: SpaceHierarchyHandler, space_id: str, via: Iterable[str] ) -> Tuple[ Sequence[Tuple[str, Iterable[str]]], Mapping[str, Optional[JsonDict]] ]: self.assertEqual(space_id, remote_space_id) self.assertEqual(via, ["remote"]) return [(restricted_room_id, [self.hs.hostname])], {} with mock.patch( "synapse.handlers.space_hierarchy.SpaceHierarchyHandler._get_space_children_remote", new=_get_space_children_remote, ): response = self._remove_from_space( self.target_user, space_id="!space:remote", include_remote_spaces=True ) self.assertEqual( response, { "left_rooms": [self.space_id, restricted_room_id], "inaccessible_rooms": [remote_space_id], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, restricted_room_id ) ) self.assertEqual(membership, Membership.LEAVE) def test_remote_spaces_excluded(self) -> None: """Tests the exclusion of remote spaces.""" remote_space_id = "!space:remote" self._add_child(self.subspace_id, remote_space_id, via=["remote"]) restricted_room_id = self._create_room((JoinRules.RESTRICTED, self.space_id)) self.helper.join(restricted_room_id, self.target_user, tok=self.target_user_tok) async def _get_space_children_remote( _self: SpaceHierarchyHandler, space_id: str, via: Iterable[str] ) -> NoReturn: self.fail( f"Unexpected _get_space_children_remote({space_id!r}, {via!r}) call" ) raise # `fail` is missing type hints with mock.patch( "synapse.handlers.space_hierarchy.SpaceHierarchyHandler._get_space_children_remote", new=_get_space_children_remote, ): response = self._remove_from_space( self.target_user, space_id="!space:remote", include_remote_spaces=False ) self.assertEqual( response, { "left_rooms": [self.space_id], "inaccessible_rooms": [remote_space_id], "failed_rooms": {}, }, ) membership, _ = self.get_success( self.store.get_local_current_membership_for_user_in_room( self.target_user, restricted_room_id ) ) self.assertEqual(membership, Membership.JOIN)