summary refs log tree commit diff
path: root/tests/module_api/test_api.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/module_api/test_api.py')
-rw-r--r--tests/module_api/test_api.py303
1 files changed, 216 insertions, 87 deletions
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 742ad14b8c..2c68b9a13c 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -13,6 +13,8 @@
 # limitations under the License.
 from unittest.mock import Mock
 
+from twisted.internet import defer
+
 from synapse.api.constants import EduTypes
 from synapse.events import EventBase
 from synapse.federation.units import Transaction
@@ -22,11 +24,13 @@ from synapse.rest.client.v1 import login, presence, room
 from synapse.types import create_requester
 
 from tests.events.test_presence_router import send_presence_update, sync_presence
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.test_utils.event_injection import inject_member_event
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase, override_config
+from tests.utils import USE_POSTGRES_FOR_TESTS
 
 
-class ModuleApiTestCase(FederatingHomeserverTestCase):
+class ModuleApiTestCase(HomeserverTestCase):
     servlets = [
         admin.register_servlets,
         login.register_servlets,
@@ -217,97 +221,16 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
         )
         self.assertFalse(is_in_public_rooms)
 
-    # The ability to send federation is required by send_local_online_presence_to.
-    @override_config({"send_federation": True})
     def test_send_local_online_presence_to(self):
-        """Tests that send_local_presence_to_users sends local online presence to local users."""
-        # Create a user who will send presence updates
-        self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
-        self.presence_receiver_tok = self.login("presence_receiver", "monkey")
-
-        # And another user that will send presence updates out
-        self.presence_sender_id = self.register_user("presence_sender", "monkey")
-        self.presence_sender_tok = self.login("presence_sender", "monkey")
-
-        # Put them in a room together so they will receive each other's presence updates
-        room_id = self.helper.create_room_as(
-            self.presence_receiver_id,
-            tok=self.presence_receiver_tok,
-        )
-        self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
-
-        # Presence sender comes online
-        send_presence_update(
-            self,
-            self.presence_sender_id,
-            self.presence_sender_tok,
-            "online",
-            "I'm online!",
-        )
-
-        # Presence receiver should have received it
-        presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
-        self.assertEqual(len(presence_updates), 1)
-
-        presence_update = presence_updates[0]  # type: UserPresenceState
-        self.assertEqual(presence_update.user_id, self.presence_sender_id)
-        self.assertEqual(presence_update.state, "online")
-
-        # Syncing again should result in no presence updates
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 0)
-
-        # Trigger sending local online presence
-        self.get_success(
-            self.module_api.send_local_online_presence_to(
-                [
-                    self.presence_receiver_id,
-                ]
-            )
-        )
-
-        # Presence receiver should have received online presence again
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 1)
-
-        presence_update = presence_updates[0]  # type: UserPresenceState
-        self.assertEqual(presence_update.user_id, self.presence_sender_id)
-        self.assertEqual(presence_update.state, "online")
-
-        # Presence sender goes offline
-        send_presence_update(
-            self,
-            self.presence_sender_id,
-            self.presence_sender_tok,
-            "offline",
-            "I slink back into the darkness.",
-        )
-
-        # Trigger sending local online presence
-        self.get_success(
-            self.module_api.send_local_online_presence_to(
-                [
-                    self.presence_receiver_id,
-                ]
-            )
-        )
-
-        # Presence receiver should *not* have received offline state
-        presence_updates, sync_token = sync_presence(
-            self, self.presence_receiver_id, sync_token
-        )
-        self.assertEqual(len(presence_updates), 0)
+        # Test sending local online presence to users from the main process
+        _test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
 
     @override_config({"send_federation": True})
     def test_send_local_online_presence_to_federation(self):
         """Tests that send_local_presence_to_users sends local online presence to remote users."""
         # Create a user who will send presence updates
-        self.presence_sender_id = self.register_user("presence_sender", "monkey")
-        self.presence_sender_tok = self.login("presence_sender", "monkey")
+        self.presence_sender_id = self.register_user("presence_sender1", "monkey")
+        self.presence_sender_tok = self.login("presence_sender1", "monkey")
 
         # And a room they're a part of
         room_id = self.helper.create_room_as(
@@ -374,3 +297,209 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
                         found_update = True
 
         self.assertTrue(found_update)
+
+
+class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
+    """For testing ModuleApi functionality in a multi-worker setup"""
+
+    # Testing stream ID replication from the main to worker processes requires postgres
+    # (due to needing `MultiWriterIdGenerator`).
+    if not USE_POSTGRES_FOR_TESTS:
+        skip = "Requires Postgres"
+
+    servlets = [
+        admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        presence.register_servlets,
+    ]
+
+    def default_config(self):
+        conf = super().default_config()
+        conf["redis"] = {"enabled": "true"}
+        conf["stream_writers"] = {"presence": ["presence_writer"]}
+        conf["instance_map"] = {
+            "presence_writer": {"host": "testserv", "port": 1001},
+        }
+        return conf
+
+    def prepare(self, reactor, clock, homeserver):
+        self.module_api = homeserver.get_module_api()
+        self.sync_handler = homeserver.get_sync_handler()
+
+    def test_send_local_online_presence_to_workers(self):
+        # Test sending local online presence to users from a worker process
+        _test_sending_local_online_presence_to_local_user(self, test_with_workers=True)
+
+
+def _test_sending_local_online_presence_to_local_user(
+    test_case: HomeserverTestCase, test_with_workers: bool = False
+):
+    """Tests that send_local_presence_to_users sends local online presence to local users.
+
+    This simultaneously tests two different usecases:
+        * Testing that this method works when either called from a worker or the main process.
+            - We test this by calling this method from both a TestCase that runs in monolith mode, and one that
+              runs with a main and generic_worker.
+        * Testing that multiple devices syncing simultaneously will all receive a snapshot of local,
+            online presence - but only once per device.
+
+    Args:
+        test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a
+            worker process. The test users will still sync with the main process. The purpose of testing
+            with a worker is to check whether a Synapse module running on a worker can inform other workers/
+            the main process that they should include additional presence when a user next syncs.
+    """
+    if test_with_workers:
+        # Create a worker process to make module_api calls against
+        worker_hs = test_case.make_worker_hs(
+            "synapse.app.generic_worker", {"worker_name": "presence_writer"}
+        )
+
+    # Create a user who will send presence updates
+    test_case.presence_receiver_id = test_case.register_user(
+        "presence_receiver1", "monkey"
+    )
+    test_case.presence_receiver_tok = test_case.login("presence_receiver1", "monkey")
+
+    # And another user that will send presence updates out
+    test_case.presence_sender_id = test_case.register_user("presence_sender2", "monkey")
+    test_case.presence_sender_tok = test_case.login("presence_sender2", "monkey")
+
+    # Put them in a room together so they will receive each other's presence updates
+    room_id = test_case.helper.create_room_as(
+        test_case.presence_receiver_id,
+        tok=test_case.presence_receiver_tok,
+    )
+    test_case.helper.join(
+        room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok
+    )
+
+    # Presence sender comes online
+    send_presence_update(
+        test_case,
+        test_case.presence_sender_id,
+        test_case.presence_sender_tok,
+        "online",
+        "I'm online!",
+    )
+
+    # Presence receiver should have received it
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id
+    )
+    test_case.assertEqual(len(presence_updates), 1)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+    test_case.assertEqual(presence_update.state, "online")
+
+    if test_with_workers:
+        # Replicate the current sync presence token from the main process to the worker process.
+        # We need to do this so that the worker process knows the current presence stream ID to
+        # insert into the database when we call ModuleApi.send_local_online_presence_to.
+        test_case.replicate()
+
+    # Syncing again should result in no presence updates
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token
+    )
+    test_case.assertEqual(len(presence_updates), 0)
+
+    # We do an (initial) sync with a second "device" now, getting a new sync token.
+    # We'll use this in a moment.
+    _, sync_token_second_device = sync_presence(
+        test_case, test_case.presence_receiver_id
+    )
+
+    # Determine on which process (main or worker) to call ModuleApi.send_local_online_presence_to on
+    if test_with_workers:
+        module_api_to_use = worker_hs.get_module_api()
+    else:
+        module_api_to_use = test_case.module_api
+
+    # Trigger sending local online presence. We expect this information
+    # to be saved to the database where all processes can access it.
+    # Note that we're syncing via the master.
+    d = module_api_to_use.send_local_online_presence_to(
+        [
+            test_case.presence_receiver_id,
+        ]
+    )
+    d = defer.ensureDeferred(d)
+
+    if test_with_workers:
+        # In order for the required presence_set_state replication request to occur between the
+        # worker and main process, we need to pump the reactor. Otherwise, the coordinator that
+        # reads the request on the main process won't do so, and the request will time out.
+        while not d.called:
+            test_case.reactor.advance(0.1)
+
+    test_case.get_success(d)
+
+    # The presence receiver should have received online presence again.
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token
+    )
+    test_case.assertEqual(len(presence_updates), 1)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+    test_case.assertEqual(presence_update.state, "online")
+
+    # We attempt to sync with the second sync token we received above - just to check that
+    # multiple syncing devices will each receive the necessary online presence.
+    presence_updates, sync_token_second_device = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token_second_device
+    )
+    test_case.assertEqual(len(presence_updates), 1)
+
+    presence_update = presence_updates[0]  # type: UserPresenceState
+    test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+    test_case.assertEqual(presence_update.state, "online")
+
+    # However, if we now sync with either "device", we won't receive another burst of online presence
+    # until the API is called again sometime in the future
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token
+    )
+
+    # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to.
+
+    # Presence sender goes offline
+    send_presence_update(
+        test_case,
+        test_case.presence_sender_id,
+        test_case.presence_sender_tok,
+        "offline",
+        "I slink back into the darkness.",
+    )
+
+    # Presence receiver should have received the updated, offline state
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token
+    )
+    test_case.assertEqual(len(presence_updates), 1)
+
+    # Now trigger sending local online presence.
+    d = module_api_to_use.send_local_online_presence_to(
+        [
+            test_case.presence_receiver_id,
+        ]
+    )
+    d = defer.ensureDeferred(d)
+
+    if test_with_workers:
+        # In order for the required presence_set_state replication request to occur between the
+        # worker and main process, we need to pump the reactor. Otherwise, the coordinator that
+        # reads the request on the main process won't do so, and the request will time out.
+        while not d.called:
+            test_case.reactor.advance(0.1)
+
+    test_case.get_success(d)
+
+    # Presence receiver should *not* have received offline state
+    presence_updates, sync_token = sync_presence(
+        test_case, test_case.presence_receiver_id, sync_token
+    )
+    test_case.assertEqual(len(presence_updates), 0)