From de0d088adc0cf3d5bbd80238b88143426cd6eaca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 20 Apr 2021 14:11:24 +0100 Subject: Add presence federation stream (#9819) --- tests/handlers/test_presence.py | 179 ++++++++++++++++++++++++++++++++++++++-- 1 file changed, 171 insertions(+), 8 deletions(-) (limited to 'tests') diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 2d12e82897..61271cd084 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -21,6 +21,7 @@ from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder +from synapse.federation.sender import FederationSender from synapse.handlers.presence import ( EXTERNAL_PROCESS_EXPIRY, FEDERATION_PING_INTERVAL, @@ -471,6 +472,168 @@ class PresenceHandlerTestCase(unittest.HomeserverTestCase): self.assertEqual(state.state, PresenceState.OFFLINE) +class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, hs): + self.presence_handler = hs.get_presence_handler() + self.clock = hs.get_clock() + self.instance_name = hs.get_instance_name() + + self.queue = self.presence_handler.get_federation_queue() + + def test_send_and_get(self): + state1 = UserPresenceState.default("@user1:test") + state2 = UserPresenceState.default("@user2:test") + state3 = UserPresenceState.default("@user3:test") + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + now_token = self.queue.get_current_token(self.instance_name) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (1, ("dest1", "@user1:test")), + (1, ("dest2", "@user1:test")), + (1, ("dest1", "@user2:test")), + (1, ("dest2", "@user2:test")), + (2, ("dest3", "@user3:test")), + ] + + self.assertCountEqual(rows, expected_rows) + + def test_send_and_get_split(self): + state1 = UserPresenceState.default("@user1:test") + state2 = UserPresenceState.default("@user2:test") + state3 = UserPresenceState.default("@user3:test") + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + + now_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (1, ("dest1", "@user1:test")), + (1, ("dest2", "@user1:test")), + (1, ("dest1", "@user2:test")), + (1, ("dest2", "@user2:test")), + ] + + self.assertCountEqual(rows, expected_rows) + + def test_clear_queue_all(self): + state1 = UserPresenceState.default("@user1:test") + state2 = UserPresenceState.default("@user2:test") + state3 = UserPresenceState.default("@user3:test") + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + self.reactor.advance(10 * 60 * 1000) + + now_token = self.queue.get_current_token(self.instance_name) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + self.assertCountEqual(rows, []) + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + now_token = self.queue.get_current_token(self.instance_name) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (3, ("dest1", "@user1:test")), + (3, ("dest2", "@user1:test")), + (3, ("dest1", "@user2:test")), + (3, ("dest2", "@user2:test")), + (4, ("dest3", "@user3:test")), + ] + + self.assertCountEqual(rows, expected_rows) + + def test_partially_clear_queue(self): + state1 = UserPresenceState.default("@user1:test") + state2 = UserPresenceState.default("@user2:test") + state3 = UserPresenceState.default("@user3:test") + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + + self.reactor.advance(2 * 60 * 1000) + + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + self.reactor.advance(4 * 60 * 1000) + + now_token = self.queue.get_current_token(self.instance_name) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (2, ("dest3", "@user3:test")), + ] + self.assertCountEqual(rows, []) + + prev_token = self.queue.get_current_token(self.instance_name) + + self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + + now_token = self.queue.get_current_token(self.instance_name) + + rows, upto_token, limited = self.get_success( + self.queue.get_replication_rows("master", prev_token, now_token, 10) + ) + self.assertEqual(upto_token, now_token) + self.assertFalse(limited) + + expected_rows = [ + (3, ("dest1", "@user1:test")), + (3, ("dest2", "@user1:test")), + (3, ("dest1", "@user2:test")), + (3, ("dest2", "@user2:test")), + (4, ("dest3", "@user3:test")), + ] + + self.assertCountEqual(rows, expected_rows) + + class PresenceJoinTestCase(unittest.HomeserverTestCase): """Tests remote servers get told about presence of users in the room when they join and when new local users join. @@ -482,10 +645,17 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( - "server", federation_http_client=None, federation_sender=Mock() + "server", + federation_http_client=None, + federation_sender=Mock(spec=FederationSender), ) return hs + def default_config(self): + config = super().default_config() + config["send_federation"] = True + return config + def prepare(self, reactor, clock, hs): self.federation_sender = hs.get_federation_sender() self.event_builder_factory = hs.get_event_builder_factory() @@ -529,9 +699,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): # Add a new remote server to the room self._add_new_user(room_id, "@alice:server2") - # We shouldn't have sent out any local presence *updates* - self.federation_sender.send_presence.assert_not_called() - # When new server is joined we send it the local users presence states. # We expect to only see user @test2:server, as @test:server is offline # and has a zero last_active_ts @@ -550,7 +717,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.federation_sender.reset_mock() self._add_new_user(room_id, "@bob:server3") - self.federation_sender.send_presence.assert_not_called() self.federation_sender.send_presence_to_destinations.assert_called_once_with( destinations=["server3"], states={expected_state} ) @@ -595,9 +761,6 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): self.reactor.pump([0]) # Wait for presence updates to be handled - # We shouldn't have sent out any local presence *updates* - self.federation_sender.send_presence.assert_not_called() - # We expect to only send test2 presence to server2 and server3 expected_state = self.get_success( self.presence_handler.current_state_for_user("@test2:server") -- cgit 1.4.1