summary refs log tree commit diff
path: root/tests/rest
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-04-24 13:36:35 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-04-24 13:36:35 +0100
commit0ddaae83c3f59b9bca7460a493b7e930c10d6b20 (patch)
tree2524f1948c0974489d281037b96ff4748d3187ba /tests/rest
parentMerge remote-tracking branch 'origin/develop' into hawkowl/cache-config-witho... (diff)
parent1.12.4 (diff)
downloadsynapse-github/anoa/temp_working_cache_config.tar.xz
Merge branch 'release-v1.12.4' of github.com:matrix-org/synapse into anoa/temp_working_cache_config github/anoa/temp_working_cache_config anoa/temp_working_cache_config
* 'release-v1.12.4' of github.com:matrix-org/synapse: (123 commits)
  1.12.4
  formatting for the changelog
  1.12.4rc1
  1.12.4rc1
  Do not treat display names as globs for push rules. (#7271)
  Query missing cross-signing keys on local sig upload (#7289)
  Fix changelog file
  Support GET account_data requests on a worker (#7311)
  Revert "Query missing cross-signing keys on local sig upload"
  Always send the user updates to their own device list (#7160)
  Query missing cross-signing keys on local sig upload
  Only register devices edu handler on the master process (#7255)
  tweak changelog
  1.12.3
  Fix the debian build in a better way. (#7212)
  Fix changelog wording
  1.12.2
  Pin Pillow>=4.3.0,<7.1.0 to fix dep issue
  1.12.1
  Note where bugs were introduced
  ...
Diffstat (limited to 'tests/rest')
-rw-r--r--tests/rest/admin/test_admin.py7
-rw-r--r--tests/rest/admin/test_user.py218
-rw-r--r--tests/rest/client/v1/test_directory.py41
-rw-r--r--tests/rest/client/v1/test_login.py111
-rw-r--r--tests/rest/client/v1/test_rooms.py160
5 files changed, 490 insertions, 47 deletions
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py

index e5984aaad8..0342aed416 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py
@@ -870,6 +870,13 @@ class RoomTestCase(unittest.HomeserverTestCase): # Set this new alias as the canonical alias for this room self.helper.send_state( room_id, + "m.room.aliases", + {"aliases": [test_alias]}, + tok=self.admin_user_tok, + state_key="test", + ) + self.helper.send_state( + room_id, "m.room.canonical_alias", {"alias": test_alias}, tok=self.admin_user_tok, diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index cbe4a6a51f..6416fb5d2a 100644 --- a/tests/rest/admin/test_user.py +++ b/tests/rest/admin/test_user.py
@@ -16,6 +16,7 @@ import hashlib import hmac import json +import urllib.parse from mock import Mock @@ -371,22 +372,24 @@ class UserRestTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.store = hs.get_datastore() - self.url = "/_synapse/admin/v2/users/@bob:test" - self.admin_user = self.register_user("admin", "pass", admin=True) self.admin_user_tok = self.login("admin", "pass") self.other_user = self.register_user("user", "pass") self.other_user_token = self.login("user", "pass") + self.url_other_user = "/_synapse/admin/v2/users/%s" % urllib.parse.quote( + self.other_user + ) def test_requester_is_no_admin(self): """ If the user is not a server admin, an error is returned. """ self.hs.config.registration_shared_secret = None + url = "/_synapse/admin/v2/users/@bob:test" request, channel = self.make_request( - "GET", self.url, access_token=self.other_user_token, + "GET", url, access_token=self.other_user_token, ) self.render(request) @@ -394,7 +397,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual("You are not a server admin", channel.json_body["error"]) request, channel = self.make_request( - "PUT", self.url, access_token=self.other_user_token, content=b"{}", + "PUT", url, access_token=self.other_user_token, content=b"{}", ) self.render(request) @@ -417,24 +420,73 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(404, channel.code, msg=channel.json_body) self.assertEqual("M_NOT_FOUND", channel.json_body["errcode"]) - def test_requester_is_admin(self): + def test_create_server_admin(self): """ - If the user is a server admin, a new user is created. + Check that a new admin user is created successfully. """ self.hs.config.registration_shared_secret = None + url = "/_synapse/admin/v2/users/@bob:test" + # Create user (server admin) body = json.dumps( { "password": "abc123", "admin": True, + "displayname": "Bob's name", "threepids": [{"medium": "email", "address": "bob@bob.bob"}], } ) + request, channel = self.make_request( + "PUT", + url, + access_token=self.admin_user_tok, + content=body.encode(encoding="utf_8"), + ) + self.render(request) + + self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@bob:test", channel.json_body["name"]) + self.assertEqual("Bob's name", channel.json_body["displayname"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"]) + self.assertEqual(True, channel.json_body["admin"]) + + # Get user + request, channel = self.make_request( + "GET", url, access_token=self.admin_user_tok, + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@bob:test", channel.json_body["name"]) + self.assertEqual("Bob's name", channel.json_body["displayname"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"]) + self.assertEqual(True, channel.json_body["admin"]) + self.assertEqual(False, channel.json_body["is_guest"]) + self.assertEqual(False, channel.json_body["deactivated"]) + + def test_create_user(self): + """ + Check that a new regular user is created successfully. + """ + self.hs.config.registration_shared_secret = None + url = "/_synapse/admin/v2/users/@bob:test" + # Create user + body = json.dumps( + { + "password": "abc123", + "admin": False, + "displayname": "Bob's name", + "threepids": [{"medium": "email", "address": "bob@bob.bob"}], + } + ) + request, channel = self.make_request( "PUT", - self.url, + url, access_token=self.admin_user_tok, content=body.encode(encoding="utf_8"), ) @@ -442,29 +494,38 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(201, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) - self.assertEqual("bob", channel.json_body["displayname"]) + self.assertEqual("Bob's name", channel.json_body["displayname"]) self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"]) + self.assertEqual(False, channel.json_body["admin"]) # Get user request, channel = self.make_request( - "GET", self.url, access_token=self.admin_user_tok, + "GET", url, access_token=self.admin_user_tok, ) self.render(request) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) self.assertEqual("@bob:test", channel.json_body["name"]) - self.assertEqual("bob", channel.json_body["displayname"]) - self.assertEqual(1, channel.json_body["admin"]) - self.assertEqual(0, channel.json_body["is_guest"]) - self.assertEqual(0, channel.json_body["deactivated"]) + self.assertEqual("Bob's name", channel.json_body["displayname"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob@bob.bob", channel.json_body["threepids"][0]["address"]) + self.assertEqual(False, channel.json_body["admin"]) + self.assertEqual(False, channel.json_body["is_guest"]) + self.assertEqual(False, channel.json_body["deactivated"]) + + def test_set_password(self): + """ + Test setting a new password for another user. + """ + self.hs.config.registration_shared_secret = None # Change password body = json.dumps({"password": "hahaha"}) request, channel = self.make_request( "PUT", - self.url, + self.url_other_user, access_token=self.admin_user_tok, content=body.encode(encoding="utf_8"), ) @@ -472,41 +533,133 @@ class UserRestTestCase(unittest.HomeserverTestCase): self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + def test_set_displayname(self): + """ + Test setting the displayname of another user. + """ + self.hs.config.registration_shared_secret = None + # Modify user + body = json.dumps({"displayname": "foobar"}) + + request, channel = self.make_request( + "PUT", + self.url_other_user, + access_token=self.admin_user_tok, + content=body.encode(encoding="utf_8"), + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual("foobar", channel.json_body["displayname"]) + + # Get user + request, channel = self.make_request( + "GET", self.url_other_user, access_token=self.admin_user_tok, + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual("foobar", channel.json_body["displayname"]) + + def test_set_threepid(self): + """ + Test setting threepid for an other user. + """ + self.hs.config.registration_shared_secret = None + + # Delete old and add new threepid to user body = json.dumps( - { - "displayname": "foobar", - "deactivated": True, - "threepids": [{"medium": "email", "address": "bob2@bob.bob"}], - } + {"threepids": [{"medium": "email", "address": "bob3@bob.bob"}]} ) request, channel = self.make_request( "PUT", - self.url, + self.url_other_user, access_token=self.admin_user_tok, content=body.encode(encoding="utf_8"), ) self.render(request) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) - self.assertEqual("@bob:test", channel.json_body["name"]) - self.assertEqual("foobar", channel.json_body["displayname"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob3@bob.bob", channel.json_body["threepids"][0]["address"]) + + # Get user + request, channel = self.make_request( + "GET", self.url_other_user, access_token=self.admin_user_tok, + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual("email", channel.json_body["threepids"][0]["medium"]) + self.assertEqual("bob3@bob.bob", channel.json_body["threepids"][0]["address"]) + + def test_deactivate_user(self): + """ + Test deactivating another user. + """ + + # Deactivate user + body = json.dumps({"deactivated": True}) + + request, channel = self.make_request( + "PUT", + self.url_other_user, + access_token=self.admin_user_tok, + content=body.encode(encoding="utf_8"), + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) self.assertEqual(True, channel.json_body["deactivated"]) # the user is deactivated, the threepid will be deleted # Get user request, channel = self.make_request( - "GET", self.url, access_token=self.admin_user_tok, + "GET", self.url_other_user, access_token=self.admin_user_tok, ) self.render(request) self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) - self.assertEqual("@bob:test", channel.json_body["name"]) - self.assertEqual("foobar", channel.json_body["displayname"]) - self.assertEqual(1, channel.json_body["admin"]) - self.assertEqual(0, channel.json_body["is_guest"]) - self.assertEqual(1, channel.json_body["deactivated"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(True, channel.json_body["deactivated"]) + + def test_set_user_as_admin(self): + """ + Test setting the admin flag on a user. + """ + self.hs.config.registration_shared_secret = None + + # Set a user as an admin + body = json.dumps({"admin": True}) + + request, channel = self.make_request( + "PUT", + self.url_other_user, + access_token=self.admin_user_tok, + content=body.encode(encoding="utf_8"), + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(True, channel.json_body["admin"]) + + # Get user + request, channel = self.make_request( + "GET", self.url_other_user, access_token=self.admin_user_tok, + ) + self.render(request) + + self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"]) + self.assertEqual("@user:test", channel.json_body["name"]) + self.assertEqual(True, channel.json_body["admin"]) def test_accidental_deactivation_prevention(self): """ @@ -514,13 +667,14 @@ class UserRestTestCase(unittest.HomeserverTestCase): for the deactivated body parameter """ self.hs.config.registration_shared_secret = None + url = "/_synapse/admin/v2/users/@bob:test" # Create user body = json.dumps({"password": "abc123"}) request, channel = self.make_request( "PUT", - self.url, + url, access_token=self.admin_user_tok, content=body.encode(encoding="utf_8"), ) @@ -532,7 +686,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Get user request, channel = self.make_request( - "GET", self.url, access_token=self.admin_user_tok, + "GET", url, access_token=self.admin_user_tok, ) self.render(request) @@ -546,7 +700,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): request, channel = self.make_request( "PUT", - self.url, + url, access_token=self.admin_user_tok, content=body.encode(encoding="utf_8"), ) @@ -556,7 +710,7 @@ class UserRestTestCase(unittest.HomeserverTestCase): # Check user is not deactivated request, channel = self.make_request( - "GET", self.url, access_token=self.admin_user_tok, + "GET", url, access_token=self.admin_user_tok, ) self.render(request) diff --git a/tests/rest/client/v1/test_directory.py b/tests/rest/client/v1/test_directory.py
index 914cf54927..633b7dbda0 100644 --- a/tests/rest/client/v1/test_directory.py +++ b/tests/rest/client/v1/test_directory.py
@@ -51,30 +51,26 @@ class DirectoryTestCase(unittest.HomeserverTestCase): self.user = self.register_user("user", "test") self.user_tok = self.login("user", "test") - def test_cannot_set_alias_via_state_event(self): - self.ensure_user_joined_room() - url = "/_matrix/client/r0/rooms/%s/state/m.room.aliases/%s" % ( - self.room_id, - self.hs.hostname, - ) - - data = {"aliases": [self.random_alias(5)]} - request_data = json.dumps(data) - - request, channel = self.make_request( - "PUT", url, request_data, access_token=self.user_tok - ) - self.render(request) - self.assertEqual(channel.code, 400, channel.result) + def test_state_event_not_in_room(self): + self.ensure_user_left_room() + self.set_alias_via_state_event(403) def test_directory_endpoint_not_in_room(self): self.ensure_user_left_room() self.set_alias_via_directory(403) + def test_state_event_in_room_too_long(self): + self.ensure_user_joined_room() + self.set_alias_via_state_event(400, alias_length=256) + def test_directory_in_room_too_long(self): self.ensure_user_joined_room() self.set_alias_via_directory(400, alias_length=256) + def test_state_event_in_room(self): + self.ensure_user_joined_room() + self.set_alias_via_state_event(200) + def test_directory_in_room(self): self.ensure_user_joined_room() self.set_alias_via_directory(200) @@ -106,6 +102,21 @@ class DirectoryTestCase(unittest.HomeserverTestCase): self.render(request) self.assertEqual(channel.code, 200, channel.result) + def set_alias_via_state_event(self, expected_code, alias_length=5): + url = "/_matrix/client/r0/rooms/%s/state/m.room.aliases/%s" % ( + self.room_id, + self.hs.hostname, + ) + + data = {"aliases": [self.random_alias(alias_length)]} + request_data = json.dumps(data) + + request, channel = self.make_request( + "PUT", url, request_data, access_token=self.user_tok + ) + self.render(request) + self.assertEqual(channel.code, expected_code, channel.result) + def set_alias_via_directory(self, expected_code, alias_length=5): url = "/_matrix/client/r0/directory/room/%s" % self.random_alias(alias_length) data = {"room_id": self.room_id} diff --git a/tests/rest/client/v1/test_login.py b/tests/rest/client/v1/test_login.py
index eae5411325..da2c9bfa1e 100644 --- a/tests/rest/client/v1/test_login.py +++ b/tests/rest/client/v1/test_login.py
@@ -1,4 +1,7 @@ import json +import urllib.parse + +from mock import Mock import synapse.rest.admin from synapse.rest.client.v1 import login @@ -252,3 +255,111 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): ) self.render(request) self.assertEquals(channel.code, 200, channel.result) + + +class CASRedirectConfirmTestCase(unittest.HomeserverTestCase): + + servlets = [ + login.register_servlets, + ] + + def make_homeserver(self, reactor, clock): + self.base_url = "https://matrix.goodserver.com/" + self.redirect_path = "_synapse/client/login/sso/redirect/confirm" + + config = self.default_config() + config["cas_config"] = { + "enabled": True, + "server_url": "https://fake.test", + "service_url": "https://matrix.goodserver.com:8448", + } + + async def get_raw(uri, args): + """Return an example response payload from a call to the `/proxyValidate` + endpoint of a CAS server, copied from + https://apereo.github.io/cas/5.0.x/protocol/CAS-Protocol-V2-Specification.html#26-proxyvalidate-cas-20 + + This needs to be returned by an async function (as opposed to set as the + mock's return value) because the corresponding Synapse code awaits on it. + """ + return """ + <cas:serviceResponse xmlns:cas='http://www.yale.edu/tp/cas'> + <cas:authenticationSuccess> + <cas:user>username</cas:user> + <cas:proxyGrantingTicket>PGTIOU-84678-8a9d...</cas:proxyGrantingTicket> + <cas:proxies> + <cas:proxy>https://proxy2/pgtUrl</cas:proxy> + <cas:proxy>https://proxy1/pgtUrl</cas:proxy> + </cas:proxies> + </cas:authenticationSuccess> + </cas:serviceResponse> + """ + + mocked_http_client = Mock(spec=["get_raw"]) + mocked_http_client.get_raw.side_effect = get_raw + + self.hs = self.setup_test_homeserver( + config=config, proxied_http_client=mocked_http_client, + ) + + return self.hs + + def test_cas_redirect_confirm(self): + """Tests that the SSO login flow serves a confirmation page before redirecting a + user to the redirect URL. + """ + base_url = "/_matrix/client/r0/login/cas/ticket?redirectUrl" + redirect_url = "https://dodgy-site.com/" + + url_parts = list(urllib.parse.urlparse(base_url)) + query = dict(urllib.parse.parse_qsl(url_parts[4])) + query.update({"redirectUrl": redirect_url}) + query.update({"ticket": "ticket"}) + url_parts[4] = urllib.parse.urlencode(query) + cas_ticket_url = urllib.parse.urlunparse(url_parts) + + # Get Synapse to call the fake CAS and serve the template. + request, channel = self.make_request("GET", cas_ticket_url) + self.render(request) + + # Test that the response is HTML. + self.assertEqual(channel.code, 200) + content_type_header_value = "" + for header in channel.result.get("headers", []): + if header[0] == b"Content-Type": + content_type_header_value = header[1].decode("utf8") + + self.assertTrue(content_type_header_value.startswith("text/html")) + + # Test that the body isn't empty. + self.assertTrue(len(channel.result["body"]) > 0) + + # And that it contains our redirect link + self.assertIn(redirect_url, channel.result["body"].decode("UTF-8")) + + @override_config( + { + "sso": { + "client_whitelist": [ + "https://legit-site.com/", + "https://other-site.com/", + ] + } + } + ) + def test_cas_redirect_whitelisted(self): + """Tests that the SSO login flow serves a redirect to a whitelisted url + """ + redirect_url = "https://legit-site.com/" + cas_ticket_url = ( + "/_matrix/client/r0/login/cas/ticket?redirectUrl=%s&ticket=ticket" + % (urllib.parse.quote(redirect_url)) + ) + + # Get Synapse to call the fake CAS and serve the template. + request, channel = self.make_request("GET", cas_ticket_url) + self.render(request) + + self.assertEqual(channel.code, 302) + location_headers = channel.headers.getRawHeaders("Location") + self.assertEqual(location_headers[0][: len(redirect_url)], redirect_url) diff --git a/tests/rest/client/v1/test_rooms.py b/tests/rest/client/v1/test_rooms.py
index 2f3df5f88f..7dd86d0c27 100644 --- a/tests/rest/client/v1/test_rooms.py +++ b/tests/rest/client/v1/test_rooms.py
@@ -1821,3 +1821,163 @@ class RoomAliasListTestCase(unittest.HomeserverTestCase): ) self.render(request) self.assertEqual(channel.code, expected_code, channel.result) + + +class RoomCanonicalAliasTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets_for_client_rest_resource, + directory.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.room_owner = self.register_user("room_owner", "test") + self.room_owner_tok = self.login("room_owner", "test") + + self.room_id = self.helper.create_room_as( + self.room_owner, tok=self.room_owner_tok + ) + + self.alias = "#alias:test" + self._set_alias_via_directory(self.alias) + + def _set_alias_via_directory(self, alias: str, expected_code: int = 200): + url = "/_matrix/client/r0/directory/room/" + alias + data = {"room_id": self.room_id} + request_data = json.dumps(data) + + request, channel = self.make_request( + "PUT", url, request_data, access_token=self.room_owner_tok + ) + self.render(request) + self.assertEqual(channel.code, expected_code, channel.result) + + def _get_canonical_alias(self, expected_code: int = 200) -> JsonDict: + """Calls the endpoint under test. returns the json response object.""" + request, channel = self.make_request( + "GET", + "rooms/%s/state/m.room.canonical_alias" % (self.room_id,), + access_token=self.room_owner_tok, + ) + self.render(request) + self.assertEqual(channel.code, expected_code, channel.result) + res = channel.json_body + self.assertIsInstance(res, dict) + return res + + def _set_canonical_alias(self, content: str, expected_code: int = 200) -> JsonDict: + """Calls the endpoint under test. returns the json response object.""" + request, channel = self.make_request( + "PUT", + "rooms/%s/state/m.room.canonical_alias" % (self.room_id,), + json.dumps(content), + access_token=self.room_owner_tok, + ) + self.render(request) + self.assertEqual(channel.code, expected_code, channel.result) + res = channel.json_body + self.assertIsInstance(res, dict) + return res + + def test_canonical_alias(self): + """Test a basic alias message.""" + # There is no canonical alias to start with. + self._get_canonical_alias(expected_code=404) + + # Create an alias. + self._set_canonical_alias({"alias": self.alias}) + + # Canonical alias now exists! + res = self._get_canonical_alias() + self.assertEqual(res, {"alias": self.alias}) + + # Now remove the alias. + self._set_canonical_alias({}) + + # There is an alias event, but it is empty. + res = self._get_canonical_alias() + self.assertEqual(res, {}) + + def test_alt_aliases(self): + """Test a canonical alias message with alt_aliases.""" + # Create an alias. + self._set_canonical_alias({"alt_aliases": [self.alias]}) + + # Canonical alias now exists! + res = self._get_canonical_alias() + self.assertEqual(res, {"alt_aliases": [self.alias]}) + + # Now remove the alt_aliases. + self._set_canonical_alias({}) + + # There is an alias event, but it is empty. + res = self._get_canonical_alias() + self.assertEqual(res, {}) + + def test_alias_alt_aliases(self): + """Test a canonical alias message with an alias and alt_aliases.""" + # Create an alias. + self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]}) + + # Canonical alias now exists! + res = self._get_canonical_alias() + self.assertEqual(res, {"alias": self.alias, "alt_aliases": [self.alias]}) + + # Now remove the alias and alt_aliases. + self._set_canonical_alias({}) + + # There is an alias event, but it is empty. + res = self._get_canonical_alias() + self.assertEqual(res, {}) + + def test_partial_modify(self): + """Test removing only the alt_aliases.""" + # Create an alias. + self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]}) + + # Canonical alias now exists! + res = self._get_canonical_alias() + self.assertEqual(res, {"alias": self.alias, "alt_aliases": [self.alias]}) + + # Now remove the alt_aliases. + self._set_canonical_alias({"alias": self.alias}) + + # There is an alias event, but it is empty. + res = self._get_canonical_alias() + self.assertEqual(res, {"alias": self.alias}) + + def test_add_alias(self): + """Test removing only the alt_aliases.""" + # Create an additional alias. + second_alias = "#second:test" + self._set_alias_via_directory(second_alias) + + # Add the canonical alias. + self._set_canonical_alias({"alias": self.alias, "alt_aliases": [self.alias]}) + + # Then add the second alias. + self._set_canonical_alias( + {"alias": self.alias, "alt_aliases": [self.alias, second_alias]} + ) + + # Canonical alias now exists! + res = self._get_canonical_alias() + self.assertEqual( + res, {"alias": self.alias, "alt_aliases": [self.alias, second_alias]} + ) + + def test_bad_data(self): + """Invalid data for alt_aliases should cause errors.""" + self._set_canonical_alias({"alt_aliases": "@bad:test"}, expected_code=400) + self._set_canonical_alias({"alt_aliases": None}, expected_code=400) + self._set_canonical_alias({"alt_aliases": 0}, expected_code=400) + self._set_canonical_alias({"alt_aliases": 1}, expected_code=400) + self._set_canonical_alias({"alt_aliases": False}, expected_code=400) + self._set_canonical_alias({"alt_aliases": True}, expected_code=400) + self._set_canonical_alias({"alt_aliases": {}}, expected_code=400) + + def test_bad_alias(self): + """An alias which does not point to the room raises a SynapseError.""" + self._set_canonical_alias({"alias": "@unknown:test"}, expected_code=400) + self._set_canonical_alias({"alt_aliases": ["@unknown:test"]}, expected_code=400)