summary refs log tree commit diff
path: root/tests/unittest.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/unittest.py')
-rw-r--r--tests/unittest.py85
1 files changed, 77 insertions, 8 deletions
diff --git a/tests/unittest.py b/tests/unittest.py
index 26204470b1..36df43c137 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -27,11 +27,12 @@ import twisted.logger
 from twisted.internet.defer import Deferred
 from twisted.trial import unittest
 
+from synapse.api.constants import EventTypes
 from synapse.config.homeserver import HomeServerConfig
 from synapse.http.server import JsonResource
 from synapse.http.site import SynapseRequest
 from synapse.server import HomeServer
-from synapse.types import UserID, create_requester
+from synapse.types import Requester, UserID, create_requester
 from synapse.util.logcontext import LoggingContext
 
 from tests.server import get_clock, make_request, render, setup_test_homeserver
@@ -297,7 +298,7 @@ class HomeserverTestCase(TestCase):
             Tuple[synapse.http.site.SynapseRequest, channel]
         """
         if isinstance(content, dict):
-            content = json.dumps(content).encode('utf8')
+            content = json.dumps(content).encode("utf8")
 
         return make_request(
             self.reactor,
@@ -341,7 +342,7 @@ class HomeserverTestCase(TestCase):
 
         # Parse the config from a config dict into a HomeServerConfig
         config_obj = HomeServerConfig()
-        config_obj.parse_config_dict(config)
+        config_obj.parse_config_dict(config, "", "")
         kwargs["config"] = config_obj
 
         hs = setup_test_homeserver(self.addCleanup, *args, **kwargs)
@@ -388,7 +389,7 @@ class HomeserverTestCase(TestCase):
         Returns:
             The MXID of the new user (unicode).
         """
-        self.hs.config.registration_shared_secret = u"shared"
+        self.hs.config.registration_shared_secret = "shared"
 
         # Create the user
         request, channel = self.make_request("GET", "/_matrix/client/r0/admin/register")
@@ -396,13 +397,13 @@ class HomeserverTestCase(TestCase):
         nonce = channel.json_body["nonce"]
 
         want_mac = hmac.new(key=b"shared", digestmod=hashlib.sha1)
-        nonce_str = b"\x00".join([username.encode('utf8'), password.encode('utf8')])
+        nonce_str = b"\x00".join([username.encode("utf8"), password.encode("utf8")])
         if admin:
             nonce_str += b"\x00admin"
         else:
             nonce_str += b"\x00notadmin"
 
-        want_mac.update(nonce.encode('ascii') + b"\x00" + nonce_str)
+        want_mac.update(nonce.encode("ascii") + b"\x00" + nonce_str)
         want_mac = want_mac.hexdigest()
 
         body = json.dumps(
@@ -415,7 +416,7 @@ class HomeserverTestCase(TestCase):
             }
         )
         request, channel = self.make_request(
-            "POST", "/_matrix/client/r0/admin/register", body.encode('utf8')
+            "POST", "/_matrix/client/r0/admin/register", body.encode("utf8")
         )
         self.render(request)
         self.assertEqual(channel.code, 200)
@@ -434,10 +435,78 @@ class HomeserverTestCase(TestCase):
             body["device_id"] = device_id
 
         request, channel = self.make_request(
-            "POST", "/_matrix/client/r0/login", json.dumps(body).encode('utf8')
+            "POST", "/_matrix/client/r0/login", json.dumps(body).encode("utf8")
         )
         self.render(request)
         self.assertEqual(channel.code, 200, channel.result)
 
         access_token = channel.json_body["access_token"]
         return access_token
+
+    def create_and_send_event(
+        self, room_id, user, soft_failed=False, prev_event_ids=None
+    ):
+        """
+        Create and send an event.
+
+        Args:
+            soft_failed (bool): Whether to create a soft failed event or not
+            prev_event_ids (list[str]|None): Explicitly set the prev events,
+                or if None just use the default
+
+        Returns:
+            str: The new event's ID.
+        """
+        event_creator = self.hs.get_event_creation_handler()
+        secrets = self.hs.get_secrets()
+        requester = Requester(user, None, False, None, None)
+
+        prev_events_and_hashes = None
+        if prev_event_ids:
+            prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
+
+        event, context = self.get_success(
+            event_creator.create_event(
+                requester,
+                {
+                    "type": EventTypes.Message,
+                    "room_id": room_id,
+                    "sender": user.to_string(),
+                    "content": {"body": secrets.token_hex(), "msgtype": "m.text"},
+                },
+                prev_events_and_hashes=prev_events_and_hashes,
+            )
+        )
+
+        if soft_failed:
+            event.internal_metadata.soft_failed = True
+
+        self.get_success(event_creator.send_nonmember_event(requester, event, context))
+
+        return event.event_id
+
+    def add_extremity(self, room_id, event_id):
+        """
+        Add the given event as an extremity to the room.
+        """
+        self.get_success(
+            self.hs.get_datastore()._simple_insert(
+                table="event_forward_extremities",
+                values={"room_id": room_id, "event_id": event_id},
+                desc="test_add_extremity",
+            )
+        )
+
+        self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((room_id,))
+
+    def attempt_wrong_password_login(self, username, password):
+        """Attempts to login as the user with the given password, asserting
+        that the attempt *fails*.
+        """
+        body = {"type": "m.login.password", "user": username, "password": password}
+
+        request, channel = self.make_request(
+            "POST", "/_matrix/client/r0/login", json.dumps(body).encode("utf8")
+        )
+        self.render(request)
+        self.assertEqual(channel.code, 403, channel.result)