summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8272.bugfix1
-rw-r--r--synapse/federation/sender/per_destination_queue.py129
-rw-r--r--synapse/storage/databases/main/transactions.py43
-rw-r--r--tests/federation/test_federation_catch_up.py165
-rw-r--r--tests/handlers/test_typing.py5
5 files changed, 338 insertions, 5 deletions
diff --git a/changelog.d/8272.bugfix b/changelog.d/8272.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8272.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 9f0852b4a2..2657767fd1 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import datetime
 import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
 
 from prometheus_client import Counter
 
@@ -92,6 +92,21 @@ class PerDestinationQueue:
         self._destination = destination
         self.transmission_loop_running = False
 
+        # True whilst we are sending events that the remote homeserver missed
+        # because it was unreachable. We start in this state so we can perform
+        # catch-up at startup.
+        # New events will only be sent once this is finished, at which point
+        # _catching_up is flipped to False.
+        self._catching_up = True  # type: bool
+
+        # The stream_ordering of the most recent PDU that was discarded due to
+        # being in catch-up mode.
+        self._catchup_last_skipped = 0  # type: int
+
+        # Cache of the last successfully-transmitted stream ordering for this
+        # destination (we are the only updater so this is safe)
+        self._last_successful_stream_ordering = None  # type: Optional[int]
+
         # a list of pending PDUs
         self._pending_pdus = []  # type: List[EventBase]
 
@@ -138,7 +153,13 @@ class PerDestinationQueue:
         Args:
             pdu: pdu to send
         """
-        self._pending_pdus.append(pdu)
+        if not self._catching_up or self._last_successful_stream_ordering is None:
+            # only enqueue the PDU if we are not catching up (False) or do not
+            # yet know if we have anything to catch up (None)
+            self._pending_pdus.append(pdu)
+        else:
+            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+
         self.attempt_new_transaction()
 
     def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -218,6 +239,13 @@ class PerDestinationQueue:
             # hence why we throw the result away.
             await get_retry_limiter(self._destination, self._clock, self._store)
 
+            if self._catching_up:
+                # we potentially need to catch-up first
+                await self._catch_up_transmission_loop()
+                if self._catching_up:
+                    # not caught up yet
+                    return
+
             pending_pdus = []
             while True:
                 # We have to keep 2 free slots for presence and rr_edus
@@ -351,8 +379,9 @@ class PerDestinationQueue:
             if e.retry_interval > 60 * 60 * 1000:
                 # we won't retry for another hour!
                 # (this suggests a significant outage)
-                # We drop pending PDUs and EDUs because otherwise they will
+                # We drop pending EDUs because otherwise they will
                 # rack up indefinitely.
+                # (Dropping PDUs is already performed by `_start_catching_up`.)
                 # Note that:
                 # - the EDUs that are being dropped here are those that we can
                 #   afford to drop (specifically, only typing notifications,
@@ -364,11 +393,12 @@ class PerDestinationQueue:
 
                 # dropping read receipts is a bit sad but should be solved
                 # through another mechanism, because this is all volatile!
-                self._pending_pdus = []
                 self._pending_edus = []
                 self._pending_edus_keyed = {}
                 self._pending_presence = {}
                 self._pending_rrs = {}
+
+            self._start_catching_up()
         except FederationDeniedError as e:
             logger.info(e)
         except HttpResponseException as e:
@@ -378,6 +408,8 @@ class PerDestinationQueue:
                 e.code,
                 e,
             )
+
+            self._start_catching_up()
         except RequestSendFailed as e:
             logger.warning(
                 "TX [%s] Failed to send transaction: %s", self._destination, e
@@ -387,16 +419,96 @@ class PerDestinationQueue:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
+
+            self._start_catching_up()
         except Exception:
             logger.exception("TX [%s] Failed to send transaction", self._destination)
             for p in pending_pdus:
                 logger.info(
                     "Failed to send event %s to %s", p.event_id, self._destination
                 )
+
+            self._start_catching_up()
         finally:
             # We want to be *very* sure we clear this after we stop processing
             self.transmission_loop_running = False
 
+    async def _catch_up_transmission_loop(self) -> None:
+        first_catch_up_check = self._last_successful_stream_ordering is None
+
+        if first_catch_up_check:
+            # first catchup so get last_successful_stream_ordering from database
+            self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
+                self._destination
+            )
+
+        if self._last_successful_stream_ordering is None:
+            # if it's still None, then this means we don't have the information
+            # in our database ­ we haven't successfully sent a PDU to this server
+            # (at least since the introduction of the feature tracking
+            # last_successful_stream_ordering).
+            # Sadly, this means we can't do anything here as we don't know what
+            # needs catching up — so catching up is futile; let's stop.
+            self._catching_up = False
+            return
+
+        # get at most 50 catchup room/PDUs
+        while True:
+            event_ids = await self._store.get_catch_up_room_event_ids(
+                self._destination, self._last_successful_stream_ordering,
+            )
+
+            if not event_ids:
+                # No more events to catch up on, but we can't ignore the chance
+                # of a race condition, so we check that no new events have been
+                # skipped due to us being in catch-up mode
+
+                if self._catchup_last_skipped > self._last_successful_stream_ordering:
+                    # another event has been skipped because we were in catch-up mode
+                    continue
+
+                # we are done catching up!
+                self._catching_up = False
+                break
+
+            if first_catch_up_check:
+                # as this is our check for needing catch-up, we may have PDUs in
+                # the queue from before we *knew* we had to do catch-up, so
+                # clear those out now.
+                self._start_catching_up()
+
+            # fetch the relevant events from the event store
+            # - redacted behaviour of REDACT is fine, since we only send metadata
+            #   of redacted events to the destination.
+            # - don't need to worry about rejected events as we do not actively
+            #   forward received events over federation.
+            catchup_pdus = await self._store.get_events_as_list(event_ids)
+            if not catchup_pdus:
+                raise AssertionError(
+                    "No events retrieved when we asked for %r. "
+                    "This should not happen." % event_ids
+                )
+
+            if logger.isEnabledFor(logging.INFO):
+                rooms = (p.room_id for p in catchup_pdus)
+                logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+
+            success = await self._transaction_manager.send_new_transaction(
+                self._destination, catchup_pdus, []
+            )
+
+            if not success:
+                return
+
+            sent_transactions_counter.inc()
+            final_pdu = catchup_pdus[-1]
+            self._last_successful_stream_ordering = cast(
+                int, final_pdu.internal_metadata.stream_ordering
+            )
+            await self._store.set_destination_last_successful_stream_ordering(
+                self._destination, self._last_successful_stream_ordering
+            )
+
     def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
         if not self._pending_rrs:
             return
@@ -457,3 +569,12 @@ class PerDestinationQueue:
         ]
 
         return (edus, stream_id)
+
+    def _start_catching_up(self) -> None:
+        """
+        Marks this destination as being in catch-up mode.
+
+        This throws away the PDU queue.
+        """
+        self._catching_up = True
+        self._pending_pdus = []
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c0a958252e..091367006e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,7 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import Iterable, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore):
             values={"last_successful_stream_ordering": last_successful_stream_ordering},
             desc="set_last_successful_stream_ordering",
         )
+
+    async def get_catch_up_room_event_ids(
+        self, destination: str, last_successful_stream_ordering: int,
+    ) -> List[str]:
+        """
+        Returns at most 50 event IDs and their corresponding stream_orderings
+        that correspond to the oldest events that have not yet been sent to
+        the destination.
+
+        Args:
+            destination: the destination in question
+            last_successful_stream_ordering: the stream_ordering of the
+                most-recently successfully-transmitted event to the destination
+
+        Returns:
+            list of event_ids
+        """
+        return await self.db_pool.runInteraction(
+            "get_catch_up_room_event_ids",
+            self._get_catch_up_room_event_ids_txn,
+            destination,
+            last_successful_stream_ordering,
+        )
+
+    @staticmethod
+    def _get_catch_up_room_event_ids_txn(
+        txn, destination: str, last_successful_stream_ordering: int,
+    ) -> List[str]:
+        q = """
+                SELECT event_id FROM destination_rooms
+                 JOIN events USING (stream_ordering)
+                WHERE destination = ?
+                  AND stream_ordering > ?
+                ORDER BY stream_ordering
+                LIMIT 50
+            """
+        txn.execute(
+            q, (destination, last_successful_stream_ordering),
+        )
+        event_ids = [row[0] for row in txn]
+        return event_ids
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 6cdcc378f0..cc52c3dfac 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -1,5 +1,10 @@
+from typing import List, Tuple
+
 from mock import Mock
 
+from synapse.events import EventBase
+from synapse.federation.sender import PerDestinationQueue, TransactionManager
+from synapse.federation.units import Edu
 from synapse.rest import admin
 from synapse.rest.client.v1 import login, room
 
@@ -156,3 +161,163 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             row_2["stream_ordering"],
             "Send succeeded but not marked as last_successful_stream_ordering",
         )
+
+    @override_config({"send_federation": True})  # critical to federate
+    def test_catch_up_from_blank_state(self):
+        """
+        Runs an overall test of federation catch-up from scratch.
+        Further tests will focus on more narrow aspects and edge-cases, but I
+        hope to provide an overall view with this test.
+        """
+        # bring the other server online
+        self.is_online = True
+
+        # let's make some events for the other server to receive
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_1 = self.helper.create_room_as("u1", tok=u1_token)
+        room_2 = self.helper.create_room_as("u1", tok=u1_token)
+
+        # also critical to federate
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+        )
+
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
+        )
+
+        # check: PDU received for topic event
+        self.assertEqual(len(self.pdus), 1)
+        self.assertEqual(self.pdus[0]["type"], "m.room.topic")
+
+        # take the remote offline
+        self.is_online = False
+
+        # send another event
+        self.helper.send(room_1, "hi user!", tok=u1_token)
+
+        # check: things didn't go well since the remote is down
+        self.assertEqual(len(self.failed_pdus), 1)
+        self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
+
+        # let's delete the federation transmission queue
+        # (this pretends we are starting up fresh.)
+        self.assertFalse(
+            self.hs.get_federation_sender()
+            ._per_destination_queues["host2"]
+            .transmission_loop_running
+        )
+        del self.hs.get_federation_sender()._per_destination_queues["host2"]
+
+        # let's also clear any backoffs
+        self.get_success(
+            self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
+        )
+
+        # bring the remote online and clear the received pdu list
+        self.is_online = True
+        self.pdus = []
+
+        # now we need to initiate a federation transaction somehow…
+        # to do that, let's send another event (because it's simple to do)
+        # (do it to another room otherwise the catch-up logic decides it doesn't
+        # need to catch up room_1 — something I overlooked when first writing
+        # this test)
+        self.helper.send(room_2, "wombats!", tok=u1_token)
+
+        # we should now have received both PDUs
+        self.assertEqual(len(self.pdus), 2)
+        self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
+        self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
+
+    def make_fake_destination_queue(
+        self, destination: str = "host2"
+    ) -> Tuple[PerDestinationQueue, List[EventBase]]:
+        """
+        Makes a fake per-destination queue.
+        """
+        transaction_manager = TransactionManager(self.hs)
+        per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
+        results_list = []
+
+        async def fake_send(
+            destination_tm: str,
+            pending_pdus: List[EventBase],
+            _pending_edus: List[Edu],
+        ) -> bool:
+            assert destination == destination_tm
+            results_list.extend(pending_pdus)
+            return True  # success!
+
+        transaction_manager.send_new_transaction = fake_send
+
+        return per_dest_queue, results_list
+
+    @override_config({"send_federation": True})
+    def test_catch_up_loop(self):
+        """
+        Tests the behaviour of _catch_up_transmission_loop.
+        """
+
+        # ARRANGE:
+        #  - a local user (u1)
+        #  - 3 rooms which u1 is joined to (and remote user @user:host2 is
+        #    joined to)
+        #  - some events (1 to 5) in those rooms
+        #      we have 'already sent' events 1 and 2 to host2
+        per_dest_queue, sent_pdus = self.make_fake_destination_queue()
+
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_1 = self.helper.create_room_as("u1", tok=u1_token)
+        room_2 = self.helper.create_room_as("u1", tok=u1_token)
+        room_3 = self.helper.create_room_as("u1", tok=u1_token)
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+        )
+        self.get_success(
+            event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
+        )
+
+        # create some events
+        self.helper.send(room_1, "you hear me!!", tok=u1_token)
+        event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
+        self.helper.send(room_3, "Matrix!", tok=u1_token)
+        event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
+        event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
+
+        # destination_rooms should already be populated, but let us pretend that we already
+        # sent (successfully) up to and including event id 2
+        event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
+
+        # also fetch event 5 so we know its last_successful_stream_ordering later
+        event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
+
+        self.get_success(
+            self.hs.get_datastore().set_destination_last_successful_stream_ordering(
+                "host2", event_2.internal_metadata.stream_ordering
+            )
+        )
+
+        # ACT
+        self.get_success(per_dest_queue._catch_up_transmission_loop())
+
+        # ASSERT, noticing in particular:
+        # - event 3 not sent out, because event 5 replaces it
+        # - order is least recent first, so event 5 comes after event 4
+        # - catch-up is completed
+        self.assertEqual(len(sent_pdus), 2)
+        self.assertEqual(sent_pdus[0].event_id, event_id_4)
+        self.assertEqual(sent_pdus[1].event_id, event_id_5)
+        self.assertFalse(per_dest_queue._catching_up)
+        self.assertEqual(
+            per_dest_queue._last_successful_stream_ordering,
+            event_5.internal_metadata.stream_ordering,
+        )
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index f306a09bfa..3fec09ea8a 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -73,6 +73,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
                 "delivered_txn",
                 "get_received_txn_response",
                 "set_received_txn_response",
+                "get_destination_last_successful_stream_ordering",
                 "get_destination_retry_timings",
                 "get_devices_by_remote",
                 "maybe_store_room_on_invite",
@@ -121,6 +122,10 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
             (0, [])
         )
 
+        self.datastore.get_destination_last_successful_stream_ordering.return_value = make_awaitable(
+            None
+        )
+
         def get_received_txn_response(*args):
             return defer.succeed(None)