diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py
index c3facc00eb..45a9d445f8 100644
--- a/tests/rest/client/v2_alpha/test_account.py
+++ b/tests/rest/client/v2_alpha/test_account.py
@@ -24,6 +24,7 @@ import pkg_resources
import synapse.rest.admin
from synapse.api.constants import LoginType, Membership
+from synapse.api.errors import Codes
from synapse.rest.client.v1 import login, room
from synapse.rest.client.v2_alpha import account, register
@@ -325,3 +326,304 @@ class DeactivateTestCase(unittest.HomeserverTestCase):
)
self.render(request)
self.assertEqual(request.code, 200)
+
+
+class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
+
+ servlets = [
+ account.register_servlets,
+ login.register_servlets,
+ synapse.rest.admin.register_servlets_for_client_rest_resource,
+ ]
+
+ def make_homeserver(self, reactor, clock):
+ config = self.default_config()
+
+ # Email config.
+ self.email_attempts = []
+
+ def sendmail(smtphost, from_addr, to_addrs, msg, **kwargs):
+ self.email_attempts.append(msg)
+
+ config["email"] = {
+ "enable_notifs": False,
+ "template_dir": os.path.abspath(
+ pkg_resources.resource_filename("synapse", "res/templates")
+ ),
+ "smtp_host": "127.0.0.1",
+ "smtp_port": 20,
+ "require_transport_security": False,
+ "smtp_user": None,
+ "smtp_pass": None,
+ "notif_from": "test@example.com",
+ }
+ config["public_baseurl"] = "https://example.com"
+
+ self.hs = self.setup_test_homeserver(config=config, sendmail=sendmail)
+ return self.hs
+
+ def prepare(self, reactor, clock, hs):
+ self.store = hs.get_datastore()
+
+ self.user_id = self.register_user("kermit", "test")
+ self.user_id_tok = self.login("kermit", "test")
+ self.email = "test@example.com"
+ self.url_3pid = b"account/3pid"
+
+ def test_add_email(self):
+ """Test adding an email to profile
+ """
+ client_secret = "foobar"
+ session_id = self._request_token(self.email, client_secret)
+
+ self.assertEquals(len(self.email_attempts), 1)
+ link = self._get_link_from_email()
+
+ self._validate_token(link)
+
+ request, channel = self.make_request(
+ "POST",
+ b"/_matrix/client/unstable/account/3pid/add",
+ {
+ "client_secret": client_secret,
+ "sid": session_id,
+ "auth": {
+ "type": "m.login.password",
+ "user": self.user_id,
+ "password": "test",
+ },
+ },
+ access_token=self.user_id_tok,
+ )
+
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
+ self.assertEqual(self.email, channel.json_body["threepids"][0]["address"])
+
+ def test_add_email_if_disabled(self):
+ """Test adding email to profile when doing so is disallowed
+ """
+ self.hs.config.enable_3pid_changes = False
+
+ client_secret = "foobar"
+ session_id = self._request_token(self.email, client_secret)
+
+ self.assertEquals(len(self.email_attempts), 1)
+ link = self._get_link_from_email()
+
+ self._validate_token(link)
+
+ request, channel = self.make_request(
+ "POST",
+ b"/_matrix/client/unstable/account/3pid/add",
+ {
+ "client_secret": client_secret,
+ "sid": session_id,
+ "auth": {
+ "type": "m.login.password",
+ "user": self.user_id,
+ "password": "test",
+ },
+ },
+ access_token=self.user_id_tok,
+ )
+ self.render(request)
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertFalse(channel.json_body["threepids"])
+
+ def test_delete_email(self):
+ """Test deleting an email from profile
+ """
+ # Add a threepid
+ self.get_success(
+ self.store.user_add_threepid(
+ user_id=self.user_id,
+ medium="email",
+ address=self.email,
+ validated_at=0,
+ added_at=0,
+ )
+ )
+
+ request, channel = self.make_request(
+ "POST",
+ b"account/3pid/delete",
+ {"medium": "email", "address": self.email},
+ access_token=self.user_id_tok,
+ )
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertFalse(channel.json_body["threepids"])
+
+ def test_delete_email_if_disabled(self):
+ """Test deleting an email from profile when disallowed
+ """
+ self.hs.config.enable_3pid_changes = False
+
+ # Add a threepid
+ self.get_success(
+ self.store.user_add_threepid(
+ user_id=self.user_id,
+ medium="email",
+ address=self.email,
+ validated_at=0,
+ added_at=0,
+ )
+ )
+
+ request, channel = self.make_request(
+ "POST",
+ b"account/3pid/delete",
+ {"medium": "email", "address": self.email},
+ access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.FORBIDDEN, channel.json_body["errcode"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
+ self.assertEqual(self.email, channel.json_body["threepids"][0]["address"])
+
+ def test_cant_add_email_without_clicking_link(self):
+ """Test that we do actually need to click the link in the email
+ """
+ client_secret = "foobar"
+ session_id = self._request_token(self.email, client_secret)
+
+ self.assertEquals(len(self.email_attempts), 1)
+
+ # Attempt to add email without clicking the link
+ request, channel = self.make_request(
+ "POST",
+ b"/_matrix/client/unstable/account/3pid/add",
+ {
+ "client_secret": client_secret,
+ "sid": session_id,
+ "auth": {
+ "type": "m.login.password",
+ "user": self.user_id,
+ "password": "test",
+ },
+ },
+ access_token=self.user_id_tok,
+ )
+ self.render(request)
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertFalse(channel.json_body["threepids"])
+
+ def test_no_valid_token(self):
+ """Test that we do actually need to request a token and can't just
+ make a session up.
+ """
+ client_secret = "foobar"
+ session_id = "weasle"
+
+ # Attempt to add email without even requesting an email
+ request, channel = self.make_request(
+ "POST",
+ b"/_matrix/client/unstable/account/3pid/add",
+ {
+ "client_secret": client_secret,
+ "sid": session_id,
+ "auth": {
+ "type": "m.login.password",
+ "user": self.user_id,
+ "password": "test",
+ },
+ },
+ access_token=self.user_id_tok,
+ )
+ self.render(request)
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(Codes.THREEPID_AUTH_FAILED, channel.json_body["errcode"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertFalse(channel.json_body["threepids"])
+
+ def _request_token(self, email, client_secret):
+ request, channel = self.make_request(
+ "POST",
+ b"account/3pid/email/requestToken",
+ {"client_secret": client_secret, "email": email, "send_attempt": 1},
+ )
+ self.render(request)
+ self.assertEquals(200, channel.code, channel.result)
+
+ return channel.json_body["sid"]
+
+ def _validate_token(self, link):
+ # Remove the host
+ path = link.replace("https://example.com", "")
+
+ request, channel = self.make_request("GET", path, shorthand=False)
+ self.render(request)
+ self.assertEquals(200, channel.code, channel.result)
+
+ def _get_link_from_email(self):
+ assert self.email_attempts, "No emails have been sent"
+
+ raw_msg = self.email_attempts[-1].decode("UTF-8")
+ mail = Parser().parsestr(raw_msg)
+
+ text = None
+ for part in mail.walk():
+ if part.get_content_type() == "text/plain":
+ text = part.get_payload(decode=True).decode("UTF-8")
+ break
+
+ if not text:
+ self.fail("Could not find text portion of email to parse")
+
+ match = re.search(r"https://example.com\S+", text)
+ assert match, "Could not find link in email"
+
+ return match.group(0)
|