summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-05-10 14:46:38 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2021-05-11 17:50:10 +0100
commite083f0fd89a9fe03efd8122f5cab872cf9ffb735 (patch)
treecc8c79a6bff0ef919d254282a83627a62ecc02fb
parentAdd presence_stream_id column to users_to_send_full_presence_to table (diff)
downloadsynapse-e083f0fd89a9fe03efd8122f5cab872cf9ffb735.tar.xz
wip tests
-rw-r--r--synapse/rest/admin/server_notice_servlet.py5
-rw-r--r--tests/module_api/test_api.py276
-rw-r--r--tests/replication/test_sharded_event_persister.py2
-rw-r--r--tests/utils.py2
4 files changed, 194 insertions, 91 deletions
diff --git a/synapse/rest/admin/server_notice_servlet.py b/synapse/rest/admin/server_notice_servlet.py
index cc3ab5854b..3def03515f 100644
--- a/synapse/rest/admin/server_notice_servlet.py
+++ b/synapse/rest/admin/server_notice_servlet.py
@@ -54,7 +54,6 @@ class SendServerNoticeServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.txns = HttpTransactionCache(hs)
-        self.snm = hs.get_server_notices_manager()
 
     def register(self, json_resource: HttpServer):
         PATTERN = "/send_server_notice"
@@ -77,7 +76,7 @@ class SendServerNoticeServlet(RestServlet):
         event_type = body.get("type", EventTypes.Message)
         state_key = body.get("state_key")
 
-        if not self.snm.is_enabled():
+        if not self.hs.get_server_notices_manager().is_enabled():
             raise SynapseError(400, "Server notices are not enabled on this server")
 
         user_id = body["user_id"]
@@ -85,7 +84,7 @@ class SendServerNoticeServlet(RestServlet):
         if not self.hs.is_mine_id(user_id):
             raise SynapseError(400, "Server notices can only be sent to local users")
 
-        event = await self.snm.send_notice(
+        event = await self.hs.get_server_notices_manager().send_notice(
             user_id=body["user_id"],
             type=event_type,
             state_key=state_key,
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 742ad14b8c..78f666f95c 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -19,14 +19,16 @@ from synapse.federation.units import Transaction
 from synapse.handlers.presence import UserPresenceState
 from synapse.rest import admin
 from synapse.rest.client.v1 import login, presence, room
-from synapse.types import create_requester
+from synapse.types import create_requester, StreamToken
 
 from tests.events.test_presence_router import send_presence_update, sync_presence
 from tests.test_utils.event_injection import inject_member_event
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase, override_config
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+from tests.utils import USE_POSTGRES_FOR_TESTS
 
 
-class ModuleApiTestCase(FederatingHomeserverTestCase):
+class ModuleApiTestCase(HomeserverTestCase):
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -217,90 +219,9 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
         )
         self.assertFalse(is_in_public_rooms)
 
-    # The ability to send federation is required by send_local_online_presence_to.
-    @override_config({"send_federation": True})
     def test_send_local_online_presence_to(self):
-        """Tests that send_local_presence_to_users sends local online presence to local users."""
-        # Create a user who will send presence updates
-        self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
-        self.presence_receiver_tok = self.login("presence_receiver", "monkey")
-
-        # And another user that will send presence updates out
-        self.presence_sender_id = self.register_user("presence_sender", "monkey")
-        self.presence_sender_tok = self.login("presence_sender", "monkey")
-
-        # Put them in a room together so they will receive each other's presence updates
-        room_id = self.helper.create_room_as(
-            self.presence_receiver_id,
-            tok=self.presence_receiver_tok,
-        )
-        self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
-
-        # Presence sender comes online
-        send_presence_update(
-            self,
-            self.presence_sender_id,
-            self.presence_sender_tok,
-            "online",
-            "I'm online!",
-        )
-
-        # Presence receiver should have received it
-        presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
-        self.assertEqual(len(presence_updates), 1)
-
-        presence_update = presence_updates[0]  # type: UserPresenceState
-        self.assertEqual(presence_update.user_id, self.presence_sender_id)
-        self.assertEqual(presence_update.state, "online")
-
-        # Syncing again should result in no presence updates
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 0)
-
-        # Trigger sending local online presence
-        self.get_success(
-            self.module_api.send_local_online_presence_to(
-                [
-                    self.presence_receiver_id,
-                ]
-            )
-        )
-
-        # Presence receiver should have received online presence again
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 1)
-
-        presence_update = presence_updates[0]  # type: UserPresenceState
-        self.assertEqual(presence_update.user_id, self.presence_sender_id)
-        self.assertEqual(presence_update.state, "online")
-
-        # Presence sender goes offline
-        send_presence_update(
-            self,
-            self.presence_sender_id,
-            self.presence_sender_tok,
-            "offline",
-            "I slink back into the darkness.",
-        )
-
-        # Trigger sending local online presence
-        self.get_success(
-            self.module_api.send_local_online_presence_to(
-                [
-                    self.presence_receiver_id,
-                ]
-            )
-        )
-
-        # Presence receiver should *not* have received offline state
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 0)
+        # Test sending local online presence to users from the main process
+        _test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
 
     @override_config({"send_federation": True})
     def test_send_local_online_presence_to_federation(self):
@@ -374,3 +295,186 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
                         found_update = True
 
         self.assertTrue(found_update)
+
+
+class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
+    """For testing ModuleApi functionality in a multi-worker setup"""
+
+    # Testing stream ID replication from the main to worker processes requires postgres
+    # (due to needing `MultiWriterIdGenerator`).
+    if not USE_POSTGRES_FOR_TESTS:
+        skip = "Requires Postgres"
+
+    servlets = [
+        admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        presence.register_servlets,
+    ]
+
+    def prepare(self, reactor, clock, homeserver):
+        self.store = homeserver.get_datastore()
+        self.module_api = homeserver.get_module_api()
+        self.event_creation_handler = homeserver.get_event_creation_handler()
+        self.sync_handler = homeserver.get_sync_handler()
+
+    def make_homeserver(self, reactor, clock):
+        return self.setup_test_homeserver()
+
+    def _get_worker_hs_config(self) -> dict:
+        config = self.default_config()
+        config["worker_app"] = "synapse.app.generic_worker"
+        config["worker_replication_host"] = "testserv"
+        config["worker_replication_http_port"] = "8765"
+
+        return config
+
+    def test_send_local_online_presence_to_workers(self):
+        # Test sending local online presence to users from a worker process
+        _test_sending_local_online_presence_to_local_user(self, test_with_workers=True)
+
+
+def _test_sending_local_online_presence_to_local_user(self: HomeserverTestCase, test_with_workers: bool = False):
+    """Tests that send_local_presence_to_users sends local online presence to local users.
+
+    Args:
+        test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a
+            worker process. The test users will still sync with the main process. The purpose of testing
+            with a worker is to check whether a Synapse module running on a worker can inform other workers/
+            the main process that they should include additional presence when a user next syncs.
+    """
+    if test_with_workers:
+        # Create a worker process to make module_api calls against
+        worker_hs = self.make_worker_hs("synapse.app.client_reader")
+
+    # Create a user who will send presence updates
+    self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
+    self.presence_receiver_tok = self.login("presence_receiver", "monkey")
+
+    # And another user that will send presence updates out
+    self.presence_sender_id = self.register_user("presence_sender", "monkey")
+    self.presence_sender_tok = self.login("presence_sender", "monkey")
+
+    # Put them in a room together so they will receive each other's presence updates
+    room_id = self.helper.create_room_as(
+        self.presence_receiver_id,
+        tok=self.presence_receiver_tok,
+    )
+    self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
+
+    # Presence sender comes online
+    send_presence_update(
+        self,
+        self.presence_sender_id,
+        self.presence_sender_tok,
+        "online",
+        "I'm online!",
+    )
+
+    # Presence receiver should have received it
+    presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
+    self.assertEqual(len(presence_updates), 1)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    self.assertEqual(presence_update.user_id, self.presence_sender_id)
+    self.assertEqual(presence_update.state, "online")
+
+    if test_with_workers:
+        # Replicate the current sync presence token from the main process to the worker process.
+        # We need to do this so that the worker process knows the current presence stream ID to
+        # insert into the database when we call ModuleApi.send_local_online_presence_to.
+        self.replicate()
+
+    # Syncing again should result in no presence updates
+    presence_updates, sync_token = sync_presence(
+        self, self.presence_receiver_id, sync_token
+    )
+    self.assertEqual(len(presence_updates), 0)
+
+    # We do an (initial) sync with a second "device" now, getting a new sync token.
+    # We'll use this in a moment.
+    _, sync_token_second_device = sync_presence(self, self.presence_receiver_id)
+
+    # Determine on which worker to call ModuleApi.send_local_online_presence_to on
+    if test_with_workers:
+        module_api_to_use = worker_hs.get_module_api()
+    else:
+        module_api_to_use = self.module_api
+
+    # Trigger sending local online presence on the worker process. We expect this information
+    # to be saved to the database where all other workers can access it.
+    self.get_success(
+        module_api_to_use.send_local_online_presence_to(
+            [
+                self.presence_receiver_id,
+            ]
+        )
+    )
+
+    if test_with_workers:
+        self.replicate()
+    self.pump(0.1)
+
+    # The presence receiver should have received online presence again.
+    print("Sync token initially:", sync_token)
+    presence_updates, sync_token = sync_presence(
+        self, self.presence_receiver_id, sync_token
+    )
+    self.assertEqual(len(presence_updates), 1)
+    print("Sync token after a sync:", sync_token)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    self.assertEqual(presence_update.user_id, self.presence_sender_id)
+    self.assertEqual(presence_update.state, "online")
+
+    # We attempt to sync with the second sync token we received above - just to check that
+    # multiple syncing devices will each receive the necessary online presence.
+    presence_updates, sync_token_second_device = sync_presence(
+        self, self.presence_receiver_id, sync_token_second_device
+    )
+    self.assertEqual(len(presence_updates), 1)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    self.assertEqual(presence_update.user_id, self.presence_sender_id)
+    self.assertEqual(presence_update.state, "online")
+
+    # However, if we now sync with either "device", we won't receive another burst of online presence
+    # until the API is called again sometime in the future
+    presence_updates, sync_token = sync_presence(
+        self, self.presence_receiver_id, sync_token
+    )
+    print("Sync token after the second sync:", sync_token)
+    print(presence_updates)
+
+    # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to.
+
+    # Presence sender goes offline
+    send_presence_update(
+        self,
+        self.presence_sender_id,
+        self.presence_sender_tok,
+        "offline",
+        "I slink back into the darkness.",
+    )
+
+    # Presence receiver should have received the updated, offline state
+    presence_updates, sync_token = sync_presence(
+        self, self.presence_receiver_id, sync_token
+    )
+    print("Sync token after the third sync:", sync_token)
+    self.assertEqual(len(presence_updates), 1)
+
+    # Now trigger sending local online presence.
+    self.get_success(
+        self.module_api.send_local_online_presence_to(
+            [
+                self.presence_receiver_id,
+            ]
+        )
+    )
+
+    # Presence receiver should *not* have received offline state
+    presence_updates, sync_token = sync_presence(
+        self, self.presence_receiver_id, sync_token
+    )
+    self.assertEqual(len(presence_updates), 0)
diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py
index d739eb6b17..5eca5c165d 100644
--- a/tests/replication/test_sharded_event_persister.py
+++ b/tests/replication/test_sharded_event_persister.py
@@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
     """Checks event persisting sharding works"""
 
     # Event persister sharding requires postgres (due to needing
-    # `MutliWriterIdGenerator`).
+    # `MultiWriterIdGenerator`).
     if not USE_POSTGRES_FOR_TESTS:
         skip = "Requires Postgres"
 
diff --git a/tests/utils.py b/tests/utils.py
index 6bd008dcfe..f695a4d963 100644
--- a/tests/utils.py
+++ b/tests/utils.py
@@ -236,7 +236,7 @@ def setup_test_homeserver(
     else:
         database_config = {
             "name": "sqlite3",
-            "args": {"database": ":memory:", "cp_min": 1, "cp_max": 1},
+            "args": {"database": "test.db", "cp_min": 1, "cp_max": 1},
         }
 
     database = DatabaseConnectionConfig("master", database_config)