diff --git a/tests/http/test_additional_resource.py b/tests/http/test_additional_resource.py
new file mode 100644
index 0000000000..62d36c2906
--- /dev/null
+++ b/tests/http/test_additional_resource.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from synapse.http.additional_resource import AdditionalResource
+from synapse.http.server import respond_with_json
+
+from tests.unittest import HomeserverTestCase
+
+
+class _AsyncTestCustomEndpoint:
+ def __init__(self, config, module_api):
+ pass
+
+ async def handle_request(self, request):
+ respond_with_json(request, 200, {"some_key": "some_value_async"})
+
+
+class _SyncTestCustomEndpoint:
+ def __init__(self, config, module_api):
+ pass
+
+ async def handle_request(self, request):
+ respond_with_json(request, 200, {"some_key": "some_value_sync"})
+
+
+class AdditionalResourceTests(HomeserverTestCase):
+ """Very basic tests that `AdditionalResource` works correctly with sync
+ and async handlers.
+ """
+
+ def test_async(self):
+ handler = _AsyncTestCustomEndpoint({}, None).handle_request
+ self.resource = AdditionalResource(self.hs, handler)
+
+ request, channel = self.make_request("GET", "/")
+ self.render(request)
+
+ self.assertEqual(request.code, 200)
+ self.assertEqual(channel.json_body, {"some_key": "some_value_async"})
+
+ def test_sync(self):
+ handler = _SyncTestCustomEndpoint({}, None).handle_request
+ self.resource = AdditionalResource(self.hs, handler)
+
+ request, channel = self.make_request("GET", "/")
+ self.render(request)
+
+ self.assertEqual(request.code, 200)
+ self.assertEqual(channel.json_body, {"some_key": "some_value_sync"})
diff --git a/tests/push/test_http.py b/tests/push/test_http.py
index baf9c785f4..b567868b02 100644
--- a/tests/push/test_http.py
+++ b/tests/push/test_http.py
@@ -25,7 +25,6 @@ from tests.unittest import HomeserverTestCase
class HTTPPusherTests(HomeserverTestCase):
-
servlets = [
synapse.rest.admin.register_servlets_for_client_rest_resource,
room.register_servlets,
@@ -35,7 +34,6 @@ class HTTPPusherTests(HomeserverTestCase):
hijack_auth = False
def make_homeserver(self, reactor, clock):
-
self.push_attempts = []
m = Mock()
@@ -90,9 +88,6 @@ class HTTPPusherTests(HomeserverTestCase):
# Create a room
room = self.helper.create_room_as(user_id, tok=access_token)
- # Invite the other person
- self.helper.invite(room=room, src=user_id, tok=access_token, targ=other_user_id)
-
# The other user joins
self.helper.join(room=room, user=other_user_id, tok=other_access_token)
@@ -157,3 +152,350 @@ class HTTPPusherTests(HomeserverTestCase):
pushers = list(pushers)
self.assertEqual(len(pushers), 1)
self.assertTrue(pushers[0]["last_stream_ordering"] > last_stream_ordering)
+
+ def test_sends_high_priority_for_encrypted(self):
+ """
+ The HTTP pusher will send pushes at high priority if they correspond
+ to an encrypted message.
+ This will happen both in 1:1 rooms and larger rooms.
+ """
+ # Register the user who gets notified
+ user_id = self.register_user("user", "pass")
+ access_token = self.login("user", "pass")
+
+ # Register the user who sends the message
+ other_user_id = self.register_user("otheruser", "pass")
+ other_access_token = self.login("otheruser", "pass")
+
+ # Register a third user
+ yet_another_user_id = self.register_user("yetanotheruser", "pass")
+ yet_another_access_token = self.login("yetanotheruser", "pass")
+
+ # Create a room
+ room = self.helper.create_room_as(user_id, tok=access_token)
+
+ # The other user joins
+ self.helper.join(room=room, user=other_user_id, tok=other_access_token)
+
+ # Register the pusher
+ user_tuple = self.get_success(
+ self.hs.get_datastore().get_user_by_access_token(access_token)
+ )
+ token_id = user_tuple["token_id"]
+
+ self.get_success(
+ self.hs.get_pusherpool().add_pusher(
+ user_id=user_id,
+ access_token=token_id,
+ kind="http",
+ app_id="m.http",
+ app_display_name="HTTP Push Notifications",
+ device_display_name="pushy push",
+ pushkey="a@example.com",
+ lang=None,
+ data={"url": "example.com"},
+ )
+ )
+
+ # Send an encrypted event
+ # I know there'd normally be set-up of an encrypted room first
+ # but this will do for our purposes
+ self.helper.send_event(
+ room,
+ "m.room.encrypted",
+ content={
+ "algorithm": "m.megolm.v1.aes-sha2",
+ "sender_key": "6lImKbzK51MzWLwHh8tUM3UBBSBrLlgup/OOCGTvumM",
+ "ciphertext": "AwgAErABoRxwpMipdgiwXgu46rHiWQ0DmRj0qUlPrMraBUDk"
+ "leTnJRljpuc7IOhsYbLY3uo2WI0ab/ob41sV+3JEIhODJPqH"
+ "TK7cEZaIL+/up9e+dT9VGF5kRTWinzjkeqO8FU5kfdRjm+3w"
+ "0sy3o1OCpXXCfO+faPhbV/0HuK4ndx1G+myNfK1Nk/CxfMcT"
+ "BT+zDS/Df/QePAHVbrr9uuGB7fW8ogW/ulnydgZPRluusFGv"
+ "J3+cg9LoPpZPAmv5Me3ec7NtdlfN0oDZ0gk3TiNkkhsxDG9Y"
+ "YcNzl78USI0q8+kOV26Bu5dOBpU4WOuojXZHJlP5lMgdzLLl"
+ "EQ0",
+ "session_id": "IigqfNWLL+ez/Is+Duwp2s4HuCZhFG9b9CZKTYHtQ4A",
+ "device_id": "AHQDUSTAAA",
+ },
+ tok=other_access_token,
+ )
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+
+ # Make the push succeed
+ self.push_attempts[0][0].callback({})
+ self.pump()
+
+ # Check our push made it with high priority
+ self.assertEqual(len(self.push_attempts), 1)
+ self.assertEqual(self.push_attempts[0][1], "example.com")
+ self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high")
+
+ # Add yet another person — we want to make this room not a 1:1
+ # (as encrypted messages in a 1:1 currently have tweaks applied
+ # so it doesn't properly exercise the condition of all encrypted
+ # messages need to be high).
+ self.helper.join(
+ room=room, user=yet_another_user_id, tok=yet_another_access_token
+ )
+
+ # Check no push notifications are sent regarding the membership changes
+ # (that would confuse the test)
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 1)
+
+ # Send another encrypted event
+ self.helper.send_event(
+ room,
+ "m.room.encrypted",
+ content={
+ "ciphertext": "AwgAEoABtEuic/2DF6oIpNH+q/PonzlhXOVho8dTv0tzFr5m"
+ "9vTo50yabx3nxsRlP2WxSqa8I07YftP+EKWCWJvTkg6o7zXq"
+ "6CK+GVvLQOVgK50SfvjHqJXN+z1VEqj+5mkZVN/cAgJzoxcH"
+ "zFHkwDPJC8kQs47IHd8EO9KBUK4v6+NQ1uE/BIak4qAf9aS/"
+ "kI+f0gjn9IY9K6LXlah82A/iRyrIrxkCkE/n0VfvLhaWFecC"
+ "sAWTcMLoF6fh1Jpke95mljbmFSpsSd/eEQw",
+ "device_id": "SRCFTWTHXO",
+ "session_id": "eMA+bhGczuTz1C5cJR1YbmrnnC6Goni4lbvS5vJ1nG4",
+ "algorithm": "m.megolm.v1.aes-sha2",
+ "sender_key": "rC/XSIAiYrVGSuaHMop8/pTZbku4sQKBZwRwukgnN1c",
+ },
+ tok=other_access_token,
+ )
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 2)
+ self.assertEqual(self.push_attempts[1][1], "example.com")
+ self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "high")
+
+ def test_sends_high_priority_for_one_to_one_only(self):
+ """
+ The HTTP pusher will send pushes at high priority if they correspond
+ to a message in a one-to-one room.
+ """
+ # Register the user who gets notified
+ user_id = self.register_user("user", "pass")
+ access_token = self.login("user", "pass")
+
+ # Register the user who sends the message
+ other_user_id = self.register_user("otheruser", "pass")
+ other_access_token = self.login("otheruser", "pass")
+
+ # Register a third user
+ yet_another_user_id = self.register_user("yetanotheruser", "pass")
+ yet_another_access_token = self.login("yetanotheruser", "pass")
+
+ # Create a room
+ room = self.helper.create_room_as(user_id, tok=access_token)
+
+ # The other user joins
+ self.helper.join(room=room, user=other_user_id, tok=other_access_token)
+
+ # Register the pusher
+ user_tuple = self.get_success(
+ self.hs.get_datastore().get_user_by_access_token(access_token)
+ )
+ token_id = user_tuple["token_id"]
+
+ self.get_success(
+ self.hs.get_pusherpool().add_pusher(
+ user_id=user_id,
+ access_token=token_id,
+ kind="http",
+ app_id="m.http",
+ app_display_name="HTTP Push Notifications",
+ device_display_name="pushy push",
+ pushkey="a@example.com",
+ lang=None,
+ data={"url": "example.com"},
+ )
+ )
+
+ # Send a message
+ self.helper.send(room, body="Hi!", tok=other_access_token)
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+
+ # Make the push succeed
+ self.push_attempts[0][0].callback({})
+ self.pump()
+
+ # Check our push made it with high priority — this is a one-to-one room
+ self.assertEqual(len(self.push_attempts), 1)
+ self.assertEqual(self.push_attempts[0][1], "example.com")
+ self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high")
+
+ # Yet another user joins
+ self.helper.join(
+ room=room, user=yet_another_user_id, tok=yet_another_access_token
+ )
+
+ # Check no push notifications are sent regarding the membership changes
+ # (that would confuse the test)
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 1)
+
+ # Send another event
+ self.helper.send(room, body="Welcome!", tok=other_access_token)
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 2)
+ self.assertEqual(self.push_attempts[1][1], "example.com")
+
+ # check that this is low-priority
+ self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low")
+
+ def test_sends_high_priority_for_mention(self):
+ """
+ The HTTP pusher will send pushes at high priority if they correspond
+ to a message containing the user's display name.
+ """
+ # Register the user who gets notified
+ user_id = self.register_user("user", "pass")
+ access_token = self.login("user", "pass")
+
+ # Register the user who sends the message
+ other_user_id = self.register_user("otheruser", "pass")
+ other_access_token = self.login("otheruser", "pass")
+
+ # Register a third user
+ yet_another_user_id = self.register_user("yetanotheruser", "pass")
+ yet_another_access_token = self.login("yetanotheruser", "pass")
+
+ # Create a room
+ room = self.helper.create_room_as(user_id, tok=access_token)
+
+ # The other users join
+ self.helper.join(room=room, user=other_user_id, tok=other_access_token)
+ self.helper.join(
+ room=room, user=yet_another_user_id, tok=yet_another_access_token
+ )
+
+ # Register the pusher
+ user_tuple = self.get_success(
+ self.hs.get_datastore().get_user_by_access_token(access_token)
+ )
+ token_id = user_tuple["token_id"]
+
+ self.get_success(
+ self.hs.get_pusherpool().add_pusher(
+ user_id=user_id,
+ access_token=token_id,
+ kind="http",
+ app_id="m.http",
+ app_display_name="HTTP Push Notifications",
+ device_display_name="pushy push",
+ pushkey="a@example.com",
+ lang=None,
+ data={"url": "example.com"},
+ )
+ )
+
+ # Send a message
+ self.helper.send(room, body="Oh, user, hello!", tok=other_access_token)
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+
+ # Make the push succeed
+ self.push_attempts[0][0].callback({})
+ self.pump()
+
+ # Check our push made it with high priority
+ self.assertEqual(len(self.push_attempts), 1)
+ self.assertEqual(self.push_attempts[0][1], "example.com")
+ self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high")
+
+ # Send another event, this time with no mention
+ self.helper.send(room, body="Are you there?", tok=other_access_token)
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 2)
+ self.assertEqual(self.push_attempts[1][1], "example.com")
+
+ # check that this is low-priority
+ self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low")
+
+ def test_sends_high_priority_for_atroom(self):
+ """
+ The HTTP pusher will send pushes at high priority if they correspond
+ to a message that contains @room.
+ """
+ # Register the user who gets notified
+ user_id = self.register_user("user", "pass")
+ access_token = self.login("user", "pass")
+
+ # Register the user who sends the message
+ other_user_id = self.register_user("otheruser", "pass")
+ other_access_token = self.login("otheruser", "pass")
+
+ # Register a third user
+ yet_another_user_id = self.register_user("yetanotheruser", "pass")
+ yet_another_access_token = self.login("yetanotheruser", "pass")
+
+ # Create a room (as other_user so the power levels are compatible with
+ # other_user sending @room).
+ room = self.helper.create_room_as(other_user_id, tok=other_access_token)
+
+ # The other users join
+ self.helper.join(room=room, user=user_id, tok=access_token)
+ self.helper.join(
+ room=room, user=yet_another_user_id, tok=yet_another_access_token
+ )
+
+ # Register the pusher
+ user_tuple = self.get_success(
+ self.hs.get_datastore().get_user_by_access_token(access_token)
+ )
+ token_id = user_tuple["token_id"]
+
+ self.get_success(
+ self.hs.get_pusherpool().add_pusher(
+ user_id=user_id,
+ access_token=token_id,
+ kind="http",
+ app_id="m.http",
+ app_display_name="HTTP Push Notifications",
+ device_display_name="pushy push",
+ pushkey="a@example.com",
+ lang=None,
+ data={"url": "example.com"},
+ )
+ )
+
+ # Send a message
+ self.helper.send(
+ room,
+ body="@room eeek! There's a spider on the table!",
+ tok=other_access_token,
+ )
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+
+ # Make the push succeed
+ self.push_attempts[0][0].callback({})
+ self.pump()
+
+ # Check our push made it with high priority
+ self.assertEqual(len(self.push_attempts), 1)
+ self.assertEqual(self.push_attempts[0][1], "example.com")
+ self.assertEqual(self.push_attempts[0][2]["notification"]["prio"], "high")
+
+ # Send another event, this time as someone without the power of @room
+ self.helper.send(
+ room, body="@room the spider is gone", tok=yet_another_access_token
+ )
+
+ # Advance time a bit, so the pusher will register something has happened
+ self.pump()
+ self.assertEqual(len(self.push_attempts), 2)
+ self.assertEqual(self.push_attempts[1][1], "example.com")
+
+ # check that this is low-priority
+ self.assertEqual(self.push_attempts[1][2]["notification"]["prio"], "low")
diff --git a/tests/rest/client/v2_alpha/test_account.py b/tests/rest/client/v2_alpha/test_account.py
index 3ab611f618..152a5182fa 100644
--- a/tests/rest/client/v2_alpha/test_account.py
+++ b/tests/rest/client/v2_alpha/test_account.py
@@ -108,6 +108,46 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
# Assert we can't log in with the old password
self.attempt_wrong_password_login("kermit", old_password)
+ def test_basic_password_reset_canonicalise_email(self):
+ """Test basic password reset flow
+ Request password reset with different spelling
+ """
+ old_password = "monkey"
+ new_password = "kangeroo"
+
+ user_id = self.register_user("kermit", old_password)
+ self.login("kermit", old_password)
+
+ email_profile = "test@example.com"
+ email_passwort_reset = "TEST@EXAMPLE.COM"
+
+ # Add a threepid
+ self.get_success(
+ self.store.user_add_threepid(
+ user_id=user_id,
+ medium="email",
+ address=email_profile,
+ validated_at=0,
+ added_at=0,
+ )
+ )
+
+ client_secret = "foobar"
+ session_id = self._request_token(email_passwort_reset, client_secret)
+
+ self.assertEquals(len(self.email_attempts), 1)
+ link = self._get_link_from_email()
+
+ self._validate_token(link)
+
+ self._reset_password(new_password, session_id, client_secret)
+
+ # Assert we can log in with the new password
+ self.login("kermit", new_password)
+
+ # Assert we can't log in with the old password
+ self.attempt_wrong_password_login("kermit", old_password)
+
def test_cant_reset_password_without_clicking_link(self):
"""Test that we do actually need to click the link in the email
"""
@@ -386,44 +426,67 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
self.email = "test@example.com"
self.url_3pid = b"account/3pid"
- def test_add_email(self):
- """Test adding an email to profile
- """
- client_secret = "foobar"
- session_id = self._request_token(self.email, client_secret)
+ def test_add_valid_email(self):
+ self.get_success(self._add_email(self.email, self.email))
- self.assertEquals(len(self.email_attempts), 1)
- link = self._get_link_from_email()
+ def test_add_valid_email_second_time(self):
+ self.get_success(self._add_email(self.email, self.email))
+ self.get_success(
+ self._request_token_invalid_email(
+ self.email,
+ expected_errcode=Codes.THREEPID_IN_USE,
+ expected_error="Email is already in use",
+ )
+ )
- self._validate_token(link)
+ def test_add_valid_email_second_time_canonicalise(self):
+ self.get_success(self._add_email(self.email, self.email))
+ self.get_success(
+ self._request_token_invalid_email(
+ "TEST@EXAMPLE.COM",
+ expected_errcode=Codes.THREEPID_IN_USE,
+ expected_error="Email is already in use",
+ )
+ )
- request, channel = self.make_request(
- "POST",
- b"/_matrix/client/unstable/account/3pid/add",
- {
- "client_secret": client_secret,
- "sid": session_id,
- "auth": {
- "type": "m.login.password",
- "user": self.user_id,
- "password": "test",
- },
- },
- access_token=self.user_id_tok,
+ def test_add_email_no_at(self):
+ self.get_success(
+ self._request_token_invalid_email(
+ "address-without-at.bar",
+ expected_errcode=Codes.UNKNOWN,
+ expected_error="Unable to parse email address",
+ )
)
- self.render(request)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ def test_add_email_two_at(self):
+ self.get_success(
+ self._request_token_invalid_email(
+ "foo@foo@test.bar",
+ expected_errcode=Codes.UNKNOWN,
+ expected_error="Unable to parse email address",
+ )
+ )
- # Get user
- request, channel = self.make_request(
- "GET", self.url_3pid, access_token=self.user_id_tok,
+ def test_add_email_bad_format(self):
+ self.get_success(
+ self._request_token_invalid_email(
+ "user@bad.example.net@good.example.com",
+ expected_errcode=Codes.UNKNOWN,
+ expected_error="Unable to parse email address",
+ )
)
- self.render(request)
- self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
- self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
- self.assertEqual(self.email, channel.json_body["threepids"][0]["address"])
+ def test_add_email_domain_to_lower(self):
+ self.get_success(self._add_email("foo@TEST.BAR", "foo@test.bar"))
+
+ def test_add_email_domain_with_umlaut(self):
+ self.get_success(self._add_email("foo@Öumlaut.com", "foo@öumlaut.com"))
+
+ def test_add_email_address_casefold(self):
+ self.get_success(self._add_email("Strauß@Example.com", "strauss@example.com"))
+
+ def test_address_trim(self):
+ self.get_success(self._add_email(" foo@test.bar ", "foo@test.bar"))
def test_add_email_if_disabled(self):
"""Test adding email to profile when doing so is disallowed
@@ -616,6 +679,19 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
return channel.json_body["sid"]
+ def _request_token_invalid_email(
+ self, email, expected_errcode, expected_error, client_secret="foobar",
+ ):
+ request, channel = self.make_request(
+ "POST",
+ b"account/3pid/email/requestToken",
+ {"client_secret": client_secret, "email": email, "send_attempt": 1},
+ )
+ self.render(request)
+ self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual(expected_errcode, channel.json_body["errcode"])
+ self.assertEqual(expected_error, channel.json_body["error"])
+
def _validate_token(self, link):
# Remove the host
path = link.replace("https://example.com", "")
@@ -643,3 +719,42 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
assert match, "Could not find link in email"
return match.group(0)
+
+ def _add_email(self, request_email, expected_email):
+ """Test adding an email to profile
+ """
+ client_secret = "foobar"
+ session_id = self._request_token(request_email, client_secret)
+
+ self.assertEquals(len(self.email_attempts), 1)
+ link = self._get_link_from_email()
+
+ self._validate_token(link)
+
+ request, channel = self.make_request(
+ "POST",
+ b"/_matrix/client/unstable/account/3pid/add",
+ {
+ "client_secret": client_secret,
+ "sid": session_id,
+ "auth": {
+ "type": "m.login.password",
+ "user": self.user_id,
+ "password": "test",
+ },
+ },
+ access_token=self.user_id_tok,
+ )
+
+ self.render(request)
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+ # Get user
+ request, channel = self.make_request(
+ "GET", self.url_3pid, access_token=self.user_id_tok,
+ )
+ self.render(request)
+
+ self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+ self.assertEqual("email", channel.json_body["threepids"][0]["medium"])
+ self.assertEqual(expected_email, channel.json_body["threepids"][0]["address"])
diff --git a/tests/test_server.py b/tests/test_server.py
index 3f6f468e5b..030f58cbdc 100644
--- a/tests/test_server.py
+++ b/tests/test_server.py
@@ -24,12 +24,7 @@ from twisted.web.server import NOT_DONE_YET
from synapse.api.errors import Codes, RedirectException, SynapseError
from synapse.config.server import parse_listener_def
-from synapse.http.server import (
- DirectServeResource,
- JsonResource,
- OptionsResource,
- wrap_html_request_handler,
-)
+from synapse.http.server import DirectServeHtmlResource, JsonResource, OptionsResource
from synapse.http.site import SynapseSite, logger
from synapse.logging.context import make_deferred_yieldable
from synapse.util import Clock
@@ -256,12 +251,11 @@ class OptionsResourceTests(unittest.TestCase):
class WrapHtmlRequestHandlerTests(unittest.TestCase):
- class TestResource(DirectServeResource):
+ class TestResource(DirectServeHtmlResource):
callback = None
- @wrap_html_request_handler
async def _async_render_GET(self, request):
- return await self.callback(request)
+ await self.callback(request)
def setUp(self):
self.reactor = ThreadedMemoryReactorClock()
diff --git a/tests/util/test_threepids.py b/tests/util/test_threepids.py
new file mode 100644
index 0000000000..5513724d87
--- /dev/null
+++ b/tests/util/test_threepids.py
@@ -0,0 +1,49 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 Dirk Klimpel
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.util.threepids import canonicalise_email
+
+from tests.unittest import HomeserverTestCase
+
+
+class CanonicaliseEmailTests(HomeserverTestCase):
+ def test_no_at(self):
+ with self.assertRaises(ValueError):
+ canonicalise_email("address-without-at.bar")
+
+ def test_two_at(self):
+ with self.assertRaises(ValueError):
+ canonicalise_email("foo@foo@test.bar")
+
+ def test_bad_format(self):
+ with self.assertRaises(ValueError):
+ canonicalise_email("user@bad.example.net@good.example.com")
+
+ def test_valid_format(self):
+ self.assertEqual(canonicalise_email("foo@test.bar"), "foo@test.bar")
+
+ def test_domain_to_lower(self):
+ self.assertEqual(canonicalise_email("foo@TEST.BAR"), "foo@test.bar")
+
+ def test_domain_with_umlaut(self):
+ self.assertEqual(canonicalise_email("foo@Öumlaut.com"), "foo@öumlaut.com")
+
+ def test_address_casefold(self):
+ self.assertEqual(
+ canonicalise_email("Strauß@Example.com"), "strauss@example.com"
+ )
+
+ def test_address_trim(self):
+ self.assertEqual(canonicalise_email(" foo@test.bar "), "foo@test.bar")
|