summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/rulecheck/test_domainrulecheck.py196
1 files changed, 166 insertions, 30 deletions
diff --git a/tests/rulecheck/test_domainrulecheck.py b/tests/rulecheck/test_domainrulecheck.py

index 702862f78b..de89f95e3c 100644 --- a/tests/rulecheck/test_domainrulecheck.py +++ b/tests/rulecheck/test_domainrulecheck.py
@@ -14,29 +14,35 @@ # limitations under the License. +import json + from synapse.config._base import ConfigError +from synapse.rest.client.v1 import admin, login, room from synapse.rulecheck.domain_rule_checker import DomainRuleChecker from tests import unittest +from tests.server import make_request, render class DomainRuleCheckerTestCase(unittest.TestCase): - def test_allowed(self): config = { "default": False, "domain_mapping": { "source_one": ["target_one", "target_two"], - "source_two": ["target_two"] - } + "source_two": ["target_two"], + }, } check = DomainRuleChecker(config) - self.assertTrue(check.user_may_invite("test:source_one", - "test:target_one", "room")) - self.assertTrue(check.user_may_invite("test:source_one", - "test:target_two", "room")) - self.assertTrue(check.user_may_invite("test:source_two", - "test:target_two", "room")) + self.assertTrue( + check.user_may_invite("test:source_one", "test:target_one", "room", False) + ) + self.assertTrue( + check.user_may_invite("test:source_one", "test:target_two", "room", False) + ) + self.assertTrue( + check.user_may_invite("test:source_two", "test:target_two", "room", False) + ) def test_disallowed(self): config = { @@ -44,50 +50,56 @@ class DomainRuleCheckerTestCase(unittest.TestCase): "domain_mapping": { "source_one": ["target_one", "target_two"], "source_two": ["target_two"], - "source_four": [] - } + "source_four": [], + }, } check = DomainRuleChecker(config) - self.assertFalse(check.user_may_invite("test:source_one", - "test:target_three", "room")) - self.assertFalse(check.user_may_invite("test:source_two", - "test:target_three", "room")) - self.assertFalse(check.user_may_invite("test:source_two", - "test:target_one", "room")) - self.assertFalse(check.user_may_invite("test:source_four", - "test:target_one", "room")) + self.assertFalse( + check.user_may_invite("test:source_one", "test:target_three", "room", False) + ) + self.assertFalse( + check.user_may_invite("test:source_two", "test:target_three", "room", False) + ) + self.assertFalse( + check.user_may_invite("test:source_two", "test:target_one", "room", False) + ) + self.assertFalse( + check.user_may_invite("test:source_four", "test:target_one", "room", False) + ) def test_default_allow(self): config = { "default": True, "domain_mapping": { "source_one": ["target_one", "target_two"], - "source_two": ["target_two"] - } + "source_two": ["target_two"], + }, } check = DomainRuleChecker(config) - self.assertTrue(check.user_may_invite("test:source_three", - "test:target_one", "room")) + self.assertTrue( + check.user_may_invite("test:source_three", "test:target_one", "room", False) + ) def test_default_deny(self): config = { "default": False, "domain_mapping": { "source_one": ["target_one", "target_two"], - "source_two": ["target_two"] - } + "source_two": ["target_two"], + }, } check = DomainRuleChecker(config) - self.assertFalse(check.user_may_invite("test:source_three", - "test:target_one", "room")) + self.assertFalse( + check.user_may_invite("test:source_three", "test:target_one", "room", False) + ) def test_config_parse(self): config = { "default": False, "domain_mapping": { "source_one": ["target_one", "target_two"], - "source_two": ["target_two"] - } + "source_two": ["target_two"], + }, } self.assertEquals(config, DomainRuleChecker.parse_config(config)) @@ -95,7 +107,131 @@ class DomainRuleCheckerTestCase(unittest.TestCase): config = { "domain_mapping": { "source_one": ["target_one", "target_two"], - "source_two": ["target_two"] + "source_two": ["target_two"], } } self.assertRaises(ConfigError, DomainRuleChecker.parse_config, config) + + +class DomainRuleCheckerRoomTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + hijack_auth = False + + def make_homeserver(self, reactor, clock): + config = self.default_config() + + config.spam_checker = (DomainRuleChecker, { + "default": True, + "domain_mapping": {}, + "can_only_join_rooms_with_invite": True, + "can_only_create_one_to_one_rooms": True, + "can_only_invite_during_room_creation": True, + }) + + hs = self.setup_test_homeserver(config=config) + return hs + + def prepare(self, reactor, clock, hs): + self.admin_user_id = self.register_user("admin_user", "pass", admin=True) + self.admin_access_token = self.login("admin_user", "pass") + + self.normal_user_id = self.register_user("normal_user", "pass", admin=False) + self.normal_access_token = self.login("normal_user", "pass") + + self.other_user_id = self.register_user("other_user", "pass", admin=False) + + def test_admin_can_create_room(self): + channel = self._create_room(self.admin_access_token) + assert channel.result["code"] == b"200", channel.result + + def test_normal_user_cannot_create_empty_room(self): + channel = self._create_room(self.normal_access_token) + assert channel.result["code"] == b"403", channel.result + + def test_normal_user_cannot_create_room_with_multiple_invites(self): + channel = self._create_room(self.normal_access_token, content={ + "invite": [self.other_user_id, self.admin_user_id], + }) + assert channel.result["code"] == b"403", channel.result + + def test_normal_user_can_room_with_single_invites(self): + channel = self._create_room(self.normal_access_token, content={ + "invite": [self.other_user_id], + }) + assert channel.result["code"] == b"200", channel.result + + def test_cannot_join_public_room(self): + channel = self._create_room(self.admin_access_token) + assert channel.result["code"] == b"200", channel.result + + room_id = channel.json_body["room_id"] + + self.helper.join( + room_id, self.normal_user_id, + tok=self.normal_access_token, + expect_code=403, + ) + + def test_can_join_invited_room(self): + channel = self._create_room(self.admin_access_token) + assert channel.result["code"] == b"200", channel.result + + room_id = channel.json_body["room_id"] + + self.helper.invite( + room_id, + src=self.admin_user_id, + targ=self.normal_user_id, + tok=self.admin_access_token, + ) + + self.helper.join( + room_id, self.normal_user_id, + tok=self.normal_access_token, + expect_code=200, + ) + + def test_cannot_invite(self): + channel = self._create_room(self.admin_access_token) + assert channel.result["code"] == b"200", channel.result + + room_id = channel.json_body["room_id"] + + self.helper.invite( + room_id, + src=self.admin_user_id, + targ=self.normal_user_id, + tok=self.admin_access_token, + ) + + self.helper.join( + room_id, self.normal_user_id, + tok=self.normal_access_token, + expect_code=200, + ) + + self.helper.invite( + room_id, + src=self.normal_user_id, + targ=self.other_user_id, + tok=self.normal_access_token, + expect_code=403, + ) + + def _create_room(self, token, content={}): + path = "/_matrix/client/r0/createRoom?access_token=%s" % ( + token, + ) + + request, channel = make_request( + self.hs.get_reactor(), "POST", path, + content=json.dumps(content).encode("utf8"), + ) + render(request, self.resource, self.hs.get_reactor()) + + return channel