summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/8230.bugfix1
-rw-r--r--changelog.d/8230.misc1
-rw-r--r--changelog.d/8247.bugfix1
-rw-r--r--changelog.d/8247.misc1
-rw-r--r--changelog.d/8258.bugfix1
-rw-r--r--changelog.d/8258.misc1
-rw-r--r--changelog.d/8322.bugfix1
-rw-r--r--synapse/federation/sender/__init__.py51
-rw-r--r--synapse/storage/databases/main/transactions.py66
-rw-r--r--tests/federation/test_federation_catch_up.py99
10 files changed, 218 insertions, 5 deletions
diff --git a/changelog.d/8230.bugfix b/changelog.d/8230.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8230.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8230.misc b/changelog.d/8230.misc
deleted file mode 100644
index bf0ba76730..0000000000
--- a/changelog.d/8230.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track the latest event for every destination and room for catch-up after federation outage.
diff --git a/changelog.d/8247.bugfix b/changelog.d/8247.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8247.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8247.misc b/changelog.d/8247.misc
deleted file mode 100644
index 3c27803be4..0000000000
--- a/changelog.d/8247.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
diff --git a/changelog.d/8258.bugfix b/changelog.d/8258.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8258.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/changelog.d/8258.misc b/changelog.d/8258.misc
deleted file mode 100644
index 3c27803be4..0000000000
--- a/changelog.d/8258.misc
+++ /dev/null
@@ -1 +0,0 @@
-Track the `stream_ordering` of the last successfully-sent event to every destination, so we can use this information to 'catch up' a remote server after an outage.
diff --git a/changelog.d/8322.bugfix b/changelog.d/8322.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8322.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 41a726878d..8bb17b3a05 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -55,6 +55,15 @@ sent_pdus_destination_dist_total = Counter(
     "Total number of PDUs queued for sending across all destinations",
 )
 
+# Time (in s) after Synapse's startup that we will begin to wake up destinations
+# that have catch-up outstanding.
+CATCH_UP_STARTUP_DELAY_SEC = 15
+
+# Time (in s) to wait in between waking up each destination, i.e. one destination
+# will be woken up every <x> seconds after Synapse's startup until we have woken
+# every destination has outstanding catch-up.
+CATCH_UP_STARTUP_INTERVAL_SEC = 5
+
 
 class FederationSender:
     def __init__(self, hs: "synapse.server.HomeServer"):
@@ -125,6 +134,14 @@ class FederationSender:
             1000.0 / hs.config.federation_rr_transactions_per_room_per_second
         )
 
+        # wake up destinations that have outstanding PDUs to be caught up
+        self._catchup_after_startup_timer = self.clock.call_later(
+            CATCH_UP_STARTUP_DELAY_SEC,
+            run_as_background_process,
+            "wake_destinations_needing_catchup",
+            self._wake_destinations_needing_catchup,
+        )
+
     def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue:
         """Get or create a PerDestinationQueue for the given destination
 
@@ -560,3 +577,37 @@ class FederationSender:
         # Dummy implementation for case where federation sender isn't offloaded
         # to a worker.
         return [], 0, False
+
+    async def _wake_destinations_needing_catchup(self):
+        """
+        Wakes up destinations that need catch-up and are not currently being
+        backed off from.
+
+        In order to reduce load spikes, adds a delay between each destination.
+        """
+
+        last_processed = None  # type: Optional[str]
+
+        while True:
+            destinations_to_wake = await self.store.get_catch_up_outstanding_destinations(
+                last_processed
+            )
+
+            if not destinations_to_wake:
+                # finished waking all destinations!
+                self._catchup_after_startup_timer = None
+                break
+
+            destinations_to_wake = [
+                d
+                for d in destinations_to_wake
+                if self._federation_shard_config.should_handle(self._instance_name, d)
+            ]
+
+            for last_processed in destinations_to_wake:
+                logger.info(
+                    "Destination %s has outstanding catch-up, waking up.",
+                    last_processed,
+                )
+                self.wake_destination(last_processed)
+                await self.clock.sleep(CATCH_UP_STARTUP_INTERVAL_SEC)
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index 99cffff50c..97aed1500e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -218,6 +218,7 @@ class TransactionStore(SQLBaseStore):
                         retry_interval = EXCLUDED.retry_interval
                     WHERE
                         EXCLUDED.retry_interval = 0
+                        OR destinations.retry_interval IS NULL
                         OR destinations.retry_interval < EXCLUDED.retry_interval
             """
 
@@ -249,7 +250,11 @@ class TransactionStore(SQLBaseStore):
                     "retry_interval": retry_interval,
                 },
             )
-        elif retry_interval == 0 or prev_row["retry_interval"] < retry_interval:
+        elif (
+            retry_interval == 0
+            or prev_row["retry_interval"] is None
+            or prev_row["retry_interval"] < retry_interval
+        ):
             self.db_pool.simple_update_one_txn(
                 txn,
                 "destinations",
@@ -397,7 +402,7 @@ class TransactionStore(SQLBaseStore):
 
     @staticmethod
     def _get_catch_up_room_event_ids_txn(
-        txn, destination: str, last_successful_stream_ordering: int,
+        txn: LoggingTransaction, destination: str, last_successful_stream_ordering: int,
     ) -> List[str]:
         q = """
                 SELECT event_id FROM destination_rooms
@@ -412,3 +417,60 @@ class TransactionStore(SQLBaseStore):
         )
         event_ids = [row[0] for row in txn]
         return event_ids
+
+    async def get_catch_up_outstanding_destinations(
+        self, after_destination: Optional[str]
+    ) -> List[str]:
+        """
+        Gets at most 25 destinations which have outstanding PDUs to be caught up,
+        and are not being backed off from
+        Args:
+            after_destination:
+                If provided, all destinations must be lexicographically greater
+                than this one.
+
+        Returns:
+            list of up to 25 destinations with outstanding catch-up.
+                These are the lexicographically first destinations which are
+                lexicographically greater than after_destination (if provided).
+        """
+        time = self.hs.get_clock().time_msec()
+
+        return await self.db_pool.runInteraction(
+            "get_catch_up_outstanding_destinations",
+            self._get_catch_up_outstanding_destinations_txn,
+            time,
+            after_destination,
+        )
+
+    @staticmethod
+    def _get_catch_up_outstanding_destinations_txn(
+        txn: LoggingTransaction, now_time_ms: int, after_destination: Optional[str]
+    ) -> List[str]:
+        q = """
+            SELECT destination FROM destinations
+                WHERE destination IN (
+                    SELECT destination FROM destination_rooms
+                        WHERE destination_rooms.stream_ordering >
+                            destinations.last_successful_stream_ordering
+                )
+                AND destination > ?
+                AND (
+                    retry_last_ts IS NULL OR
+                    retry_last_ts + retry_interval < ?
+                )
+                ORDER BY destination
+                LIMIT 25
+        """
+        txn.execute(
+            q,
+            (
+                # everything is lexicographically greater than "" so this gives
+                # us the first batch of up to 25.
+                after_destination or "",
+                now_time_ms,
+            ),
+        )
+
+        destinations = [row[0] for row in txn]
+        return destinations
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index cc52c3dfac..1a3ccb263d 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -321,3 +321,102 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
             per_dest_queue._last_successful_stream_ordering,
             event_5.internal_metadata.stream_ordering,
         )
+
+    @override_config({"send_federation": True})
+    def test_catch_up_on_synapse_startup(self):
+        """
+        Tests the behaviour of get_catch_up_outstanding_destinations and
+            _wake_destinations_needing_catchup.
+        """
+
+        # list of sorted server names (note that there are more servers than the batch
+        # size used in get_catch_up_outstanding_destinations).
+        server_names = ["server%02d" % number for number in range(42)] + ["zzzerver"]
+
+        # ARRANGE:
+        #  - a local user (u1)
+        #  - a room which u1 is joined to (and remote users @user:serverXX are
+        #    joined to)
+
+        # mark the remotes as online
+        self.is_online = True
+
+        self.register_user("u1", "you the one")
+        u1_token = self.login("u1", "you the one")
+        room_id = self.helper.create_room_as("u1", tok=u1_token)
+
+        for server_name in server_names:
+            self.get_success(
+                event_injection.inject_member_event(
+                    self.hs, room_id, "@user:%s" % server_name, "join"
+                )
+            )
+
+        # create an event
+        self.helper.send(room_id, "deary me!", tok=u1_token)
+
+        # ASSERT:
+        # - All servers are up to date so none should have outstanding catch-up
+        outstanding_when_successful = self.get_success(
+            self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
+        )
+        self.assertEqual(outstanding_when_successful, [])
+
+        # ACT:
+        # - Make the remote servers unreachable
+        self.is_online = False
+
+        # - Mark zzzerver as being backed-off from
+        now = self.clock.time_msec()
+        self.get_success(
+            self.hs.get_datastore().set_destination_retry_timings(
+                "zzzerver", now, now, 24 * 60 * 60 * 1000  # retry in 1 day
+            )
+        )
+
+        # - Send an event
+        self.helper.send(room_id, "can anyone hear me?", tok=u1_token)
+
+        # ASSERT (get_catch_up_outstanding_destinations):
+        # - all remotes are outstanding
+        # - they are returned in batches of 25, in order
+        outstanding_1 = self.get_success(
+            self.hs.get_datastore().get_catch_up_outstanding_destinations(None)
+        )
+
+        self.assertEqual(len(outstanding_1), 25)
+        self.assertEqual(outstanding_1, server_names[0:25])
+
+        outstanding_2 = self.get_success(
+            self.hs.get_datastore().get_catch_up_outstanding_destinations(
+                outstanding_1[-1]
+            )
+        )
+        self.assertNotIn("zzzerver", outstanding_2)
+        self.assertEqual(len(outstanding_2), 17)
+        self.assertEqual(outstanding_2, server_names[25:-1])
+
+        # ACT: call _wake_destinations_needing_catchup
+
+        # patch wake_destination to just count the destinations instead
+        woken = []
+
+        def wake_destination_track(destination):
+            woken.append(destination)
+
+        self.hs.get_federation_sender().wake_destination = wake_destination_track
+
+        # cancel the pre-existing timer for _wake_destinations_needing_catchup
+        # this is because we are calling it manually rather than waiting for it
+        # to be called automatically
+        self.hs.get_federation_sender()._catchup_after_startup_timer.cancel()
+
+        self.get_success(
+            self.hs.get_federation_sender()._wake_destinations_needing_catchup(), by=5.0
+        )
+
+        # ASSERT (_wake_destinations_needing_catchup):
+        # - all remotes are woken up, save for zzzerver
+        self.assertNotIn("zzzerver", woken)
+        # - all destinations are woken exactly once; they appear once in woken.
+        self.assertCountEqual(woken, server_names[:-1])