summary refs log tree commit diff
path: root/tests/handlers/test_user_directory.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/handlers/test_user_directory.py')
-rw-r--r--tests/handlers/test_user_directory.py119
1 files changed, 104 insertions, 15 deletions
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py

index c15bce5bef..87be94111f 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py
@@ -17,12 +17,13 @@ from mock import Mock from twisted.internet import defer import synapse.rest.admin -from synapse.api.constants import UserTypes +from synapse.api.constants import EventTypes, RoomEncryptionAlgorithms, UserTypes from synapse.rest.client.v1 import login, room from synapse.rest.client.v2_alpha import user_directory from synapse.storage.roommember import ProfileInfo from tests import unittest +from tests.unittest import override_config class UserDirectoryTestCase(unittest.HomeserverTestCase): @@ -147,9 +148,97 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): s = self.get_success(self.handler.search_users(u1, "user3", 10)) self.assertEqual(len(s["results"]), 0) + @override_config({"encryption_enabled_by_default_for_room_type": "all"}) + def test_encrypted_by_default_config_option_all(self): + """Tests that invite-only and non-invite-only rooms have encryption enabled by + default when the config option encryption_enabled_by_default_for_room_type is "all". + """ + # Create a user + user = self.register_user("user", "pass") + user_token = self.login(user, "pass") + + # Create an invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=False, tok=user_token) + + # Check that the room has an encryption state event + event_content = self.helper.get_state( + room_id=room_id, event_type=EventTypes.RoomEncryption, tok=user_token, + ) + self.assertEqual(event_content, {"algorithm": RoomEncryptionAlgorithms.DEFAULT}) + + # Create a non invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=True, tok=user_token) + + # Check that the room has an encryption state event + event_content = self.helper.get_state( + room_id=room_id, event_type=EventTypes.RoomEncryption, tok=user_token, + ) + self.assertEqual(event_content, {"algorithm": RoomEncryptionAlgorithms.DEFAULT}) + + @override_config({"encryption_enabled_by_default_for_room_type": "invite"}) + def test_encrypted_by_default_config_option_invite(self): + """Tests that only new, invite-only rooms have encryption enabled by default when + the config option encryption_enabled_by_default_for_room_type is "invite". + """ + # Create a user + user = self.register_user("user", "pass") + user_token = self.login(user, "pass") + + # Create an invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=False, tok=user_token) + + # Check that the room has an encryption state event + event_content = self.helper.get_state( + room_id=room_id, event_type=EventTypes.RoomEncryption, tok=user_token, + ) + self.assertEqual(event_content, {"algorithm": RoomEncryptionAlgorithms.DEFAULT}) + + # Create a non invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=True, tok=user_token) + + # Check that the room does not have an encryption state event + self.helper.get_state( + room_id=room_id, + event_type=EventTypes.RoomEncryption, + tok=user_token, + expect_code=404, + ) + + @override_config({"encryption_enabled_by_default_for_room_type": "off"}) + def test_encrypted_by_default_config_option_off(self): + """Tests that neither new invite-only nor non-invite-only rooms have encryption + enabled by default when the config option + encryption_enabled_by_default_for_room_type is "off". + """ + # Create a user + user = self.register_user("user", "pass") + user_token = self.login(user, "pass") + + # Create an invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=False, tok=user_token) + + # Check that the room does not have an encryption state event + self.helper.get_state( + room_id=room_id, + event_type=EventTypes.RoomEncryption, + tok=user_token, + expect_code=404, + ) + + # Create a non invite-only room as that user + room_id = self.helper.create_room_as(user, is_public=True, tok=user_token) + + # Check that the room does not have an encryption state event + self.helper.get_state( + room_id=room_id, + event_type=EventTypes.RoomEncryption, + tok=user_token, + expect_code=404, + ) + def test_spam_checker(self): """ - A user which fails to the spam checks will not appear in search results. + A user which fails the spam checks will not appear in search results. """ u1 = self.register_user("user1", "pass") u1_token = self.login(u1, "pass") @@ -180,7 +269,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): # Configure a spam checker that does not filter any users. spam_checker = self.hs.get_spam_checker() - class AllowAll(object): + class AllowAll: def check_username_for_spam(self, user_profile): # Allow all users. return False @@ -193,7 +282,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self.assertEqual(len(s["results"]), 1) # Configure a spam checker that filters all users. - class BlockAll(object): + class BlockAll: def check_username_for_spam(self, user_profile): # All users are spammy. return True @@ -250,7 +339,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): def get_users_in_public_rooms(self): r = self.get_success( - self.store.db.simple_select_list( + self.store.db_pool.simple_select_list( "users_in_public_rooms", None, ("user_id", "room_id") ) ) @@ -261,7 +350,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): def get_users_who_share_private_rooms(self): return self.get_success( - self.store.db.simple_select_list( + self.store.db_pool.simple_select_list( "users_who_share_private_rooms", None, ["user_id", "other_user_id", "room_id"], @@ -273,10 +362,10 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): Add the background updates we need to run. """ # Ugh, have to reset this flag - self.store.db.updates._all_done = False + self.store.db_pool.updates._all_done = False self.get_success( - self.store.db.simple_insert( + self.store.db_pool.simple_insert( "background_updates", { "update_name": "populate_user_directory_createtables", @@ -285,7 +374,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): ) ) self.get_success( - self.store.db.simple_insert( + self.store.db_pool.simple_insert( "background_updates", { "update_name": "populate_user_directory_process_rooms", @@ -295,7 +384,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): ) ) self.get_success( - self.store.db.simple_insert( + self.store.db_pool.simple_insert( "background_updates", { "update_name": "populate_user_directory_process_users", @@ -305,7 +394,7 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): ) ) self.get_success( - self.store.db.simple_insert( + self.store.db_pool.simple_insert( "background_updates", { "update_name": "populate_user_directory_cleanup", @@ -348,10 +437,10 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self._add_background_updates() while not self.get_success( - self.store.db.updates.has_completed_background_updates() + self.store.db_pool.updates.has_completed_background_updates() ): self.get_success( - self.store.db.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(100), by=0.1 ) shares_private = self.get_users_who_share_private_rooms() @@ -387,10 +476,10 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): self._add_background_updates() while not self.get_success( - self.store.db.updates.has_completed_background_updates() + self.store.db_pool.updates.has_completed_background_updates() ): self.get_success( - self.store.db.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(100), by=0.1 ) shares_private = self.get_users_who_share_private_rooms()