From e083f0fd89a9fe03efd8122f5cab872cf9ffb735 Mon Sep 17 00:00:00 2001 From: Andrew Morgan Date: Mon, 10 May 2021 14:46:38 +0100 Subject: wip tests --- synapse/rest/admin/server_notice_servlet.py | 5 +- tests/module_api/test_api.py | 276 +++++++++++++++------- tests/replication/test_sharded_event_persister.py | 2 +- tests/utils.py | 2 +- 4 files changed, 194 insertions(+), 91 deletions(-) diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py index cc3ab5854b..3def03515f 100644 --- a/synapse/rest/admin/server_notice_servlet.py +++ b/synapse/rest/admin/server_notice_servlet.py @@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet): self.hs = hs self.auth = hs.get_auth() self.txns = HttpTransactionCache(hs) - self.snm = hs.get_server_notices_manager() def register(self, json_resource: HttpServer): PATTERN = "/send_server_notice" @@ -77,7 +76,7 @@ class SendServerNoticeServlet(RestServlet): event_type = body.get("type", EventTypes.Message) state_key = body.get("state_key") - if not self.snm.is_enabled(): + if not self.hs.get_server_notices_manager().is_enabled(): raise SynapseError(400, "Server notices are not enabled on this server") user_id = body["user_id"] @@ -85,7 +84,7 @@ class SendServerNoticeServlet(RestServlet): if not self.hs.is_mine_id(user_id): raise SynapseError(400, "Server notices can only be sent to local users") - event = await self.snm.send_notice( + event = await self.hs.get_server_notices_manager().send_notice( user_id=body["user_id"], type=event_type, state_key=state_key, diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py index 742ad14b8c..78f666f95c 100644 --- a/tests/module_api/test_api.py +++ b/tests/module_api/test_api.py @@ -19,14 +19,16 @@ from synapse.federation.units import Transaction from synapse.handlers.presence import UserPresenceState from synapse.rest import admin from synapse.rest.client.v1 import login, presence, room -from synapse.types import create_requester +from synapse.types import create_requester, StreamToken from tests.events.test_presence_router import send_presence_update, sync_presence from tests.test_utils.event_injection import inject_member_event -from tests.unittest import FederatingHomeserverTestCase, override_config +from tests.unittest import HomeserverTestCase, override_config +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.utils import USE_POSTGRES_FOR_TESTS -class ModuleApiTestCase(FederatingHomeserverTestCase): +class ModuleApiTestCase(HomeserverTestCase): servlets = [ admin.register_servlets, login.register_servlets, @@ -217,90 +219,9 @@ class ModuleApiTestCase(FederatingHomeserverTestCase): ) self.assertFalse(is_in_public_rooms) - # The ability to send federation is required by send_local_online_presence_to. - @override_config({"send_federation": True}) def test_send_local_online_presence_to(self): - """Tests that send_local_presence_to_users sends local online presence to local users.""" - # Create a user who will send presence updates - self.presence_receiver_id = self.register_user("presence_receiver", "monkey") - self.presence_receiver_tok = self.login("presence_receiver", "monkey") - - # And another user that will send presence updates out - self.presence_sender_id = self.register_user("presence_sender", "monkey") - self.presence_sender_tok = self.login("presence_sender", "monkey") - - # Put them in a room together so they will receive each other's presence updates - room_id = self.helper.create_room_as( - self.presence_receiver_id, - tok=self.presence_receiver_tok, - ) - self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) - - # Presence sender comes online - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "online", - "I'm online!", - ) - - # Presence receiver should have received it - presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Syncing again should result in no presence updates - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should have received online presence again - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 1) - - presence_update = presence_updates[0] # type: UserPresenceState - self.assertEqual(presence_update.user_id, self.presence_sender_id) - self.assertEqual(presence_update.state, "online") - - # Presence sender goes offline - send_presence_update( - self, - self.presence_sender_id, - self.presence_sender_tok, - "offline", - "I slink back into the darkness.", - ) - - # Trigger sending local online presence - self.get_success( - self.module_api.send_local_online_presence_to( - [ - self.presence_receiver_id, - ] - ) - ) - - # Presence receiver should *not* have received offline state - presence_updates, sync_token = sync_presence( - self, self.presence_receiver_id, sync_token - ) - self.assertEqual(len(presence_updates), 0) + # Test sending local online presence to users from the main process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=False) @override_config({"send_federation": True}) def test_send_local_online_presence_to_federation(self): @@ -374,3 +295,186 @@ class ModuleApiTestCase(FederatingHomeserverTestCase): found_update = True self.assertTrue(found_update) + + +class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase): + """For testing ModuleApi functionality in a multi-worker setup""" + + # Testing stream ID replication from the main to worker processes requires postgres + # (due to needing `MultiWriterIdGenerator`). + if not USE_POSTGRES_FOR_TESTS: + skip = "Requires Postgres" + + servlets = [ + admin.register_servlets, + login.register_servlets, + room.register_servlets, + presence.register_servlets, + ] + + def prepare(self, reactor, clock, homeserver): + self.store = homeserver.get_datastore() + self.module_api = homeserver.get_module_api() + self.event_creation_handler = homeserver.get_event_creation_handler() + self.sync_handler = homeserver.get_sync_handler() + + def make_homeserver(self, reactor, clock): + return self.setup_test_homeserver() + + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_app"] = "synapse.app.generic_worker" + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + + return config + + def test_send_local_online_presence_to_workers(self): + # Test sending local online presence to users from a worker process + _test_sending_local_online_presence_to_local_user(self, test_with_workers=True) + + +def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, test_with_workers: bool = False): + """Tests that send_local_presence_to_users sends local online presence to local users. + + Args: + test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a + worker process. The test users will still sync with the main process. The purpose of testing + with a worker is to check whether a Synapse module running on a worker can inform other workers/ + the main process that they should include additional presence when a user next syncs. + """ + if test_with_workers: + # Create a worker process to make module_api calls against + worker_hs = self.make_worker_hs("synapse.app.client_reader") + + # Create a user who will send presence updates + self.presence_receiver_id = self.register_user("presence_receiver", "monkey") + self.presence_receiver_tok = self.login("presence_receiver", "monkey") + + # And another user that will send presence updates out + self.presence_sender_id = self.register_user("presence_sender", "monkey") + self.presence_sender_tok = self.login("presence_sender", "monkey") + + # Put them in a room together so they will receive each other's presence updates + room_id = self.helper.create_room_as( + self.presence_receiver_id, + tok=self.presence_receiver_tok, + ) + self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok) + + # Presence sender comes online + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "online", + "I'm online!", + ) + + # Presence receiver should have received it + presence_updates, sync_token = sync_presence(self, self.presence_receiver_id) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + if test_with_workers: + # Replicate the current sync presence token from the main process to the worker process. + # We need to do this so that the worker process knows the current presence stream ID to + # insert into the database when we call ModuleApi.send_local_online_presence_to. + self.replicate() + + # Syncing again should result in no presence updates + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) + + # We do an (initial) sync with a second "device" now, getting a new sync token. + # We'll use this in a moment. + _, sync_token_second_device = sync_presence(self, self.presence_receiver_id) + + # Determine on which worker to call ModuleApi.send_local_online_presence_to on + if test_with_workers: + module_api_to_use = worker_hs.get_module_api() + else: + module_api_to_use = self.module_api + + # Trigger sending local online presence on the worker process. We expect this information + # to be saved to the database where all other workers can access it. + self.get_success( + module_api_to_use.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + if test_with_workers: + self.replicate() + self.pump(0.1) + + # The presence receiver should have received online presence again. + print("Sync token initially:", sync_token) + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 1) + print("Sync token after a sync:", sync_token) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # We attempt to sync with the second sync token we received above - just to check that + # multiple syncing devices will each receive the necessary online presence. + presence_updates, sync_token_second_device = sync_presence( + self, self.presence_receiver_id, sync_token_second_device + ) + self.assertEqual(len(presence_updates), 1) + + presence_update = presence_updates[0] # type: UserPresenceState + self.assertEqual(presence_update.user_id, self.presence_sender_id) + self.assertEqual(presence_update.state, "online") + + # However, if we now sync with either "device", we won't receive another burst of online presence + # until the API is called again sometime in the future + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + print("Sync token after the second sync:", sync_token) + print(presence_updates) + + # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to. + + # Presence sender goes offline + send_presence_update( + self, + self.presence_sender_id, + self.presence_sender_tok, + "offline", + "I slink back into the darkness.", + ) + + # Presence receiver should have received the updated, offline state + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + print("Sync token after the third sync:", sync_token) + self.assertEqual(len(presence_updates), 1) + + # Now trigger sending local online presence. + self.get_success( + self.module_api.send_local_online_presence_to( + [ + self.presence_receiver_id, + ] + ) + ) + + # Presence receiver should *not* have received offline state + presence_updates, sync_token = sync_presence( + self, self.presence_receiver_id, sync_token + ) + self.assertEqual(len(presence_updates), 0) diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py index d739eb6b17..5eca5c165d 100644 --- a/tests/replication/test_sharded_event_persister.py +++ b/tests/replication/test_sharded_event_persister.py @@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase): """Checks event persisting sharding works""" # Event persister sharding requires postgres (due to needing - # `MutliWriterIdGenerator`). + # `MultiWriterIdGenerator`). if not USE_POSTGRES_FOR_TESTS: skip = "Requires Postgres" diff --git a/tests/utils.py b/tests/utils.py index 6bd008dcfe..f695a4d963 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -236,7 +236,7 @@ def setup_test_homeserver( else: database_config = { "name": "sqlite3", - "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1}, + "args": {"database": "test.db", "cp_min": 1, "cp_max": 1}, } database = DatabaseConnectionConfig("master", database_config) -- cgit 1.4.1