diff --git a/tests/events/test_presence_router.py b/tests/events/test_presence_router.py
index 01d257307c..875b0d0a11 100644
--- a/tests/events/test_presence_router.py
+++ b/tests/events/test_presence_router.py
@@ -302,11 +302,18 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
)
# Check that the expected presence updates were sent
- expected_users = [
+ # We explicitly compare using sets as we expect that calling
+ # module_api.send_local_online_presence_to will create a presence
+ # update that is a duplicate of the specified user's current presence.
+ # These are sent to clients and will be picked up below, thus we use a
+ # set to deduplicate. We're just interested that non-offline updates were
+ # sent out for each user ID.
+ expected_users = {
self.other_user_id,
self.presence_receiving_user_one_id,
self.presence_receiving_user_two_id,
- ]
+ }
+ found_users = set()
calls = (
self.hs.get_federation_transport_client().send_transaction.call_args_list
@@ -326,12 +333,12 @@ class PresenceRouterTestCase(FederatingHomeserverTestCase):
# EDUs can contain multiple presence updates
for presence_update in edu["content"]["push"]:
# Check for presence updates that contain the user IDs we're after
- expected_users.remove(presence_update["user_id"])
+ found_users.add(presence_update["user_id"])
# Ensure that no offline states are being sent out
self.assertNotEqual(presence_update["presence"], "offline")
- self.assertEqual(len(expected_users), 0)
+ self.assertEqual(found_users, expected_users)
def send_presence_update(
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 742ad14b8c..2c68b9a13c 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -13,6 +13,8 @@
# limitations under the License.
from unittest.mock import Mock
+from twisted.internet import defer
+
from synapse.api.constants import EduTypes
from synapse.events import EventBase
from synapse.federation.units import Transaction
@@ -22,11 +24,13 @@ from synapse.rest.client.v1 import login, presence, room
from synapse.types import create_requester
from tests.events.test_presence_router import send_presence_update, sync_presence
+from tests.replication._base import BaseMultiWorkerStreamTestCase
from tests.test_utils.event_injection import inject_member_event
-from tests.unittest import FederatingHomeserverTestCase, override_config
+from tests.unittest import HomeserverTestCase, override_config
+from tests.utils import USE_POSTGRES_FOR_TESTS
-class ModuleApiTestCase(FederatingHomeserverTestCase):
+class ModuleApiTestCase(HomeserverTestCase):
servlets = [
admin.register_servlets,
login.register_servlets,
@@ -217,97 +221,16 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
)
self.assertFalse(is_in_public_rooms)
- # The ability to send federation is required by send_local_online_presence_to.
- @override_config({"send_federation": True})
def test_send_local_online_presence_to(self):
- """Tests that send_local_presence_to_users sends local online presence to local users."""
- # Create a user who will send presence updates
- self.presence_receiver_id = self.register_user("presence_receiver", "monkey")
- self.presence_receiver_tok = self.login("presence_receiver", "monkey")
-
- # And another user that will send presence updates out
- self.presence_sender_id = self.register_user("presence_sender", "monkey")
- self.presence_sender_tok = self.login("presence_sender", "monkey")
-
- # Put them in a room together so they will receive each other's presence updates
- room_id = self.helper.create_room_as(
- self.presence_receiver_id,
- tok=self.presence_receiver_tok,
- )
- self.helper.join(room_id, self.presence_sender_id, tok=self.presence_sender_tok)
-
- # Presence sender comes online
- send_presence_update(
- self,
- self.presence_sender_id,
- self.presence_sender_tok,
- "online",
- "I'm online!",
- )
-
- # Presence receiver should have received it
- presence_updates, sync_token = sync_presence(self, self.presence_receiver_id)
- self.assertEqual(len(presence_updates), 1)
-
- presence_update = presence_updates[0] # type: UserPresenceState
- self.assertEqual(presence_update.user_id, self.presence_sender_id)
- self.assertEqual(presence_update.state, "online")
-
- # Syncing again should result in no presence updates
- presence_updates, sync_token = sync_presence(
- self, self.presence_receiver_id, sync_token
- )
- self.assertEqual(len(presence_updates), 0)
-
- # Trigger sending local online presence
- self.get_success(
- self.module_api.send_local_online_presence_to(
- [
- self.presence_receiver_id,
- ]
- )
- )
-
- # Presence receiver should have received online presence again
- presence_updates, sync_token = sync_presence(
- self, self.presence_receiver_id, sync_token
- )
- self.assertEqual(len(presence_updates), 1)
-
- presence_update = presence_updates[0] # type: UserPresenceState
- self.assertEqual(presence_update.user_id, self.presence_sender_id)
- self.assertEqual(presence_update.state, "online")
-
- # Presence sender goes offline
- send_presence_update(
- self,
- self.presence_sender_id,
- self.presence_sender_tok,
- "offline",
- "I slink back into the darkness.",
- )
-
- # Trigger sending local online presence
- self.get_success(
- self.module_api.send_local_online_presence_to(
- [
- self.presence_receiver_id,
- ]
- )
- )
-
- # Presence receiver should *not* have received offline state
- presence_updates, sync_token = sync_presence(
- self, self.presence_receiver_id, sync_token
- )
- self.assertEqual(len(presence_updates), 0)
+ # Test sending local online presence to users from the main process
+ _test_sending_local_online_presence_to_local_user(self, test_with_workers=False)
@override_config({"send_federation": True})
def test_send_local_online_presence_to_federation(self):
"""Tests that send_local_presence_to_users sends local online presence to remote users."""
# Create a user who will send presence updates
- self.presence_sender_id = self.register_user("presence_sender", "monkey")
- self.presence_sender_tok = self.login("presence_sender", "monkey")
+ self.presence_sender_id = self.register_user("presence_sender1", "monkey")
+ self.presence_sender_tok = self.login("presence_sender1", "monkey")
# And a room they're a part of
room_id = self.helper.create_room_as(
@@ -374,3 +297,209 @@ class ModuleApiTestCase(FederatingHomeserverTestCase):
found_update = True
self.assertTrue(found_update)
+
+
+class ModuleApiWorkerTestCase(BaseMultiWorkerStreamTestCase):
+ """For testing ModuleApi functionality in a multi-worker setup"""
+
+ # Testing stream ID replication from the main to worker processes requires postgres
+ # (due to needing `MultiWriterIdGenerator`).
+ if not USE_POSTGRES_FOR_TESTS:
+ skip = "Requires Postgres"
+
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ presence.register_servlets,
+ ]
+
+ def default_config(self):
+ conf = super().default_config()
+ conf["redis"] = {"enabled": "true"}
+ conf["stream_writers"] = {"presence": ["presence_writer"]}
+ conf["instance_map"] = {
+ "presence_writer": {"host": "testserv", "port": 1001},
+ }
+ return conf
+
+ def prepare(self, reactor, clock, homeserver):
+ self.module_api = homeserver.get_module_api()
+ self.sync_handler = homeserver.get_sync_handler()
+
+ def test_send_local_online_presence_to_workers(self):
+ # Test sending local online presence to users from a worker process
+ _test_sending_local_online_presence_to_local_user(self, test_with_workers=True)
+
+
+def _test_sending_local_online_presence_to_local_user(
+ test_case: HomeserverTestCase, test_with_workers: bool = False
+):
+ """Tests that send_local_presence_to_users sends local online presence to local users.
+
+ This simultaneously tests two different usecases:
+ * Testing that this method works when either called from a worker or the main process.
+ - We test this by calling this method from both a TestCase that runs in monolith mode, and one that
+ runs with a main and generic_worker.
+ * Testing that multiple devices syncing simultaneously will all receive a snapshot of local,
+ online presence - but only once per device.
+
+ Args:
+ test_with_workers: If True, this method will call ModuleApi.send_local_online_presence_to on a
+ worker process. The test users will still sync with the main process. The purpose of testing
+ with a worker is to check whether a Synapse module running on a worker can inform other workers/
+ the main process that they should include additional presence when a user next syncs.
+ """
+ if test_with_workers:
+ # Create a worker process to make module_api calls against
+ worker_hs = test_case.make_worker_hs(
+ "synapse.app.generic_worker", {"worker_name": "presence_writer"}
+ )
+
+ # Create a user who will send presence updates
+ test_case.presence_receiver_id = test_case.register_user(
+ "presence_receiver1", "monkey"
+ )
+ test_case.presence_receiver_tok = test_case.login("presence_receiver1", "monkey")
+
+ # And another user that will send presence updates out
+ test_case.presence_sender_id = test_case.register_user("presence_sender2", "monkey")
+ test_case.presence_sender_tok = test_case.login("presence_sender2", "monkey")
+
+ # Put them in a room together so they will receive each other's presence updates
+ room_id = test_case.helper.create_room_as(
+ test_case.presence_receiver_id,
+ tok=test_case.presence_receiver_tok,
+ )
+ test_case.helper.join(
+ room_id, test_case.presence_sender_id, tok=test_case.presence_sender_tok
+ )
+
+ # Presence sender comes online
+ send_presence_update(
+ test_case,
+ test_case.presence_sender_id,
+ test_case.presence_sender_tok,
+ "online",
+ "I'm online!",
+ )
+
+ # Presence receiver should have received it
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id
+ )
+ test_case.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+ test_case.assertEqual(presence_update.state, "online")
+
+ if test_with_workers:
+ # Replicate the current sync presence token from the main process to the worker process.
+ # We need to do this so that the worker process knows the current presence stream ID to
+ # insert into the database when we call ModuleApi.send_local_online_presence_to.
+ test_case.replicate()
+
+ # Syncing again should result in no presence updates
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token
+ )
+ test_case.assertEqual(len(presence_updates), 0)
+
+ # We do an (initial) sync with a second "device" now, getting a new sync token.
+ # We'll use this in a moment.
+ _, sync_token_second_device = sync_presence(
+ test_case, test_case.presence_receiver_id
+ )
+
+ # Determine on which process (main or worker) to call ModuleApi.send_local_online_presence_to on
+ if test_with_workers:
+ module_api_to_use = worker_hs.get_module_api()
+ else:
+ module_api_to_use = test_case.module_api
+
+ # Trigger sending local online presence. We expect this information
+ # to be saved to the database where all processes can access it.
+ # Note that we're syncing via the master.
+ d = module_api_to_use.send_local_online_presence_to(
+ [
+ test_case.presence_receiver_id,
+ ]
+ )
+ d = defer.ensureDeferred(d)
+
+ if test_with_workers:
+ # In order for the required presence_set_state replication request to occur between the
+ # worker and main process, we need to pump the reactor. Otherwise, the coordinator that
+ # reads the request on the main process won't do so, and the request will time out.
+ while not d.called:
+ test_case.reactor.advance(0.1)
+
+ test_case.get_success(d)
+
+ # The presence receiver should have received online presence again.
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token
+ )
+ test_case.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+ test_case.assertEqual(presence_update.state, "online")
+
+ # We attempt to sync with the second sync token we received above - just to check that
+ # multiple syncing devices will each receive the necessary online presence.
+ presence_updates, sync_token_second_device = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token_second_device
+ )
+ test_case.assertEqual(len(presence_updates), 1)
+
+ presence_update = presence_updates[0] # type: UserPresenceState
+ test_case.assertEqual(presence_update.user_id, test_case.presence_sender_id)
+ test_case.assertEqual(presence_update.state, "online")
+
+ # However, if we now sync with either "device", we won't receive another burst of online presence
+ # until the API is called again sometime in the future
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token
+ )
+
+ # Now we check that we don't receive *offline* updates using ModuleApi.send_local_online_presence_to.
+
+ # Presence sender goes offline
+ send_presence_update(
+ test_case,
+ test_case.presence_sender_id,
+ test_case.presence_sender_tok,
+ "offline",
+ "I slink back into the darkness.",
+ )
+
+ # Presence receiver should have received the updated, offline state
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token
+ )
+ test_case.assertEqual(len(presence_updates), 1)
+
+ # Now trigger sending local online presence.
+ d = module_api_to_use.send_local_online_presence_to(
+ [
+ test_case.presence_receiver_id,
+ ]
+ )
+ d = defer.ensureDeferred(d)
+
+ if test_with_workers:
+ # In order for the required presence_set_state replication request to occur between the
+ # worker and main process, we need to pump the reactor. Otherwise, the coordinator that
+ # reads the request on the main process won't do so, and the request will time out.
+ while not d.called:
+ test_case.reactor.advance(0.1)
+
+ test_case.get_success(d)
+
+ # Presence receiver should *not* have received offline state
+ presence_updates, sync_token = sync_presence(
+ test_case, test_case.presence_receiver_id, sync_token
+ )
+ test_case.assertEqual(len(presence_updates), 0)
diff --git a/tests/replication/test_sharded_event_persister.py b/tests/replication/test_sharded_event_persister.py
index d739eb6b17..5eca5c165d 100644
--- a/tests/replication/test_sharded_event_persister.py
+++ b/tests/replication/test_sharded_event_persister.py
@@ -30,7 +30,7 @@ class EventPersisterShardTestCase(BaseMultiWorkerStreamTestCase):
"""Checks event persisting sharding works"""
# Event persister sharding requires postgres (due to needing
- # `MutliWriterIdGenerator`).
+ # `MultiWriterIdGenerator`).
if not USE_POSTGRES_FOR_TESTS:
skip = "Requires Postgres"
|