summary refs log tree commit diff
path: root/tests/federation/test_federation_catch_up.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/federation/test_federation_catch_up.py')
-rw-r--r--tests/federation/test_federation_catch_up.py165
1 files changed, 165 insertions, 0 deletions
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 6cdcc378f0..cc52c3dfac 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -1,5 +1,10 @@
+from typing import List, Tuple
+
 from mock import Mock
 
+from synapse.events import EventBase
+from synapse.federation.sender import PerDestinationQueue, TransactionManager
+from synapse.federation.units import Edu
 from synapse.rest import admin
 from synapse.rest.client.v1 import login, room
 
@@ -156,3 +161,163 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             row_2["stream_ordering"],
             "Send succeeded but not marked as last_successful_stream_ordering",
         )
+
+    @override_config({"send_federation": True})  # critical to federate
+    def test_catch_up_from_blank_state(self):
+        """
+        Runs an overall test of federation catch-up from scratch.
+        Further tests will focus on more narrow aspects and edge-cases, but I
+        hope to provide an overall view with this test.
+        """
+        # bring the other server online
+        self.is_online = True
+
+        # let's make some events for the other server to receive
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_1 = self.helper.create_room_as("u1", tok=u1_token)
+        room_2 = self.helper.create_room_as("u1", tok=u1_token)
+
+        # also critical to federate
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+        )
+
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
+        )
+
+        # check: PDU received for topic event
+        self.assertEqual(len(self.pdus), 1)
+        self.assertEqual(self.pdus[0]["type"], "m.room.topic")
+
+        # take the remote offline
+        self.is_online = False
+
+        # send another event
+        self.helper.send(room_1, "hi user!", tok=u1_token)
+
+        # check: things didn't go well since the remote is down
+        self.assertEqual(len(self.failed_pdus), 1)
+        self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
+
+        # let's delete the federation transmission queue
+        # (this pretends we are starting up fresh.)
+        self.assertFalse(
+            self.hs.get_federation_sender()
+            ._per_destination_queues["host2"]
+            .transmission_loop_running
+        )
+        del self.hs.get_federation_sender()._per_destination_queues["host2"]
+
+        # let's also clear any backoffs
+        self.get_success(
+            self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
+        )
+
+        # bring the remote online and clear the received pdu list
+        self.is_online = True
+        self.pdus = []
+
+        # now we need to initiate a federation transaction somehow…
+        # to do that, let's send another event (because it's simple to do)
+        # (do it to another room otherwise the catch-up logic decides it doesn't
+        # need to catch up room_1 — something I overlooked when first writing
+        # this test)
+        self.helper.send(room_2, "wombats!", tok=u1_token)
+
+        # we should now have received both PDUs
+        self.assertEqual(len(self.pdus), 2)
+        self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
+        self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
+
+    def make_fake_destination_queue(
+        self, destination: str = "host2"
+    ) -> Tuple[PerDestinationQueue, List[EventBase]]:
+        """
+        Makes a fake per-destination queue.
+        """
+        transaction_manager = TransactionManager(self.hs)
+        per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
+        results_list = []
+
+        async def fake_send(
+            destination_tm: str,
+            pending_pdus: List[EventBase],
+            _pending_edus: List[Edu],
+        ) -> bool:
+            assert destination == destination_tm
+            results_list.extend(pending_pdus)
+            return True  # success!
+
+        transaction_manager.send_new_transaction = fake_send
+
+        return per_dest_queue, results_list
+
+    @override_config({"send_federation": True})
+    def test_catch_up_loop(self):
+        """
+        Tests the behaviour of _catch_up_transmission_loop.
+        """
+
+        # ARRANGE:
+        #  - a local user (u1)
+        #  - 3 rooms which u1 is joined to (and remote user @user:host2 is
+        #    joined to)
+        #  - some events (1 to 5) in those rooms
+        #      we have 'already sent' events 1 and 2 to host2
+        per_dest_queue, sent_pdus = self.make_fake_destination_queue()
+
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_1 = self.helper.create_room_as("u1", tok=u1_token)
+        room_2 = self.helper.create_room_as("u1", tok=u1_token)
+        room_3 = self.helper.create_room_as("u1", tok=u1_token)
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
+        )
+
+        # create some events
+        self.helper.send(room_1, "you hear me!!", tok=u1_token)
+        event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
+        self.helper.send(room_3, "Matrix!", tok=u1_token)
+        event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
+        event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
+
+        # destination_rooms should already be populated, but let us pretend that we already
+        # sent (successfully) up to and including event id 2
+        event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
+
+        # also fetch event 5 so we know its last_successful_stream_ordering later
+        event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
+
+        self.get_success(
+            self.hs.get_datastore().set_destination_last_successful_stream_ordering(
+                "host2", event_2.internal_metadata.stream_ordering
+            )
+        )
+
+        # ACT
+        self.get_success(per_dest_queue._catch_up_transmission_loop())
+
+        # ASSERT, noticing in particular:
+        # - event 3 not sent out, because event 5 replaces it
+        # - order is least recent first, so event 5 comes after event 4
+        # - catch-up is completed
+        self.assertEqual(len(sent_pdus), 2)
+        self.assertEqual(sent_pdus[0].event_id, event_id_4)
+        self.assertEqual(sent_pdus[1].event_id, event_id_5)
+        self.assertFalse(per_dest_queue._catching_up)
+        self.assertEqual(
+            per_dest_queue._last_successful_stream_ordering,
+            event_5.internal_metadata.stream_ordering,
+        )