diff --git a/changelog.d/8272.bugfix b/changelog.d/8272.bugfix
new file mode 100644
index 0000000000..532d0e22fe
--- /dev/null
+++ b/changelog.d/8272.bugfix
@@ -0,0 +1 @@
+Fix messages over federation being lost until an event is sent into the same room.
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 9f0852b4a2..2657767fd1 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -15,7 +15,7 @@
# limitations under the License.
import datetime
import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Tuple
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Tuple, cast
from prometheus_client import Counter
@@ -92,6 +92,21 @@ class PerDestinationQueue:
self._destination = destination
self.transmission_loop_running = False
+ # True whilst we are sending events that the remote homeserver missed
+ # because it was unreachable. We start in this state so we can perform
+ # catch-up at startup.
+ # New events will only be sent once this is finished, at which point
+ # _catching_up is flipped to False.
+ self._catching_up = True # type: bool
+
+ # The stream_ordering of the most recent PDU that was discarded due to
+ # being in catch-up mode.
+ self._catchup_last_skipped = 0 # type: int
+
+ # Cache of the last successfully-transmitted stream ordering for this
+ # destination (we are the only updater so this is safe)
+ self._last_successful_stream_ordering = None # type: Optional[int]
+
# a list of pending PDUs
self._pending_pdus = [] # type: List[EventBase]
@@ -138,7 +153,13 @@ class PerDestinationQueue:
Args:
pdu: pdu to send
"""
- self._pending_pdus.append(pdu)
+ if not self._catching_up or self._last_successful_stream_ordering is None:
+ # only enqueue the PDU if we are not catching up (False) or do not
+ # yet know if we have anything to catch up (None)
+ self._pending_pdus.append(pdu)
+ else:
+ self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
+
self.attempt_new_transaction()
def send_presence(self, states: Iterable[UserPresenceState]) -> None:
@@ -218,6 +239,13 @@ class PerDestinationQueue:
# hence why we throw the result away.
await get_retry_limiter(self._destination, self._clock, self._store)
+ if self._catching_up:
+ # we potentially need to catch-up first
+ await self._catch_up_transmission_loop()
+ if self._catching_up:
+ # not caught up yet
+ return
+
pending_pdus = []
while True:
# We have to keep 2 free slots for presence and rr_edus
@@ -351,8 +379,9 @@ class PerDestinationQueue:
if e.retry_interval > 60 * 60 * 1000:
# we won't retry for another hour!
# (this suggests a significant outage)
- # We drop pending PDUs and EDUs because otherwise they will
+ # We drop pending EDUs because otherwise they will
# rack up indefinitely.
+ # (Dropping PDUs is already performed by `_start_catching_up`.)
# Note that:
# - the EDUs that are being dropped here are those that we can
# afford to drop (specifically, only typing notifications,
@@ -364,11 +393,12 @@ class PerDestinationQueue:
# dropping read receipts is a bit sad but should be solved
# through another mechanism, because this is all volatile!
- self._pending_pdus = []
self._pending_edus = []
self._pending_edus_keyed = {}
self._pending_presence = {}
self._pending_rrs = {}
+
+ self._start_catching_up()
except FederationDeniedError as e:
logger.info(e)
except HttpResponseException as e:
@@ -378,6 +408,8 @@ class PerDestinationQueue:
e.code,
e,
)
+
+ self._start_catching_up()
except RequestSendFailed as e:
logger.warning(
"TX [%s] Failed to send transaction: %s", self._destination, e
@@ -387,16 +419,96 @@ class PerDestinationQueue:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
+
+ self._start_catching_up()
except Exception:
logger.exception("TX [%s] Failed to send transaction", self._destination)
for p in pending_pdus:
logger.info(
"Failed to send event %s to %s", p.event_id, self._destination
)
+
+ self._start_catching_up()
finally:
# We want to be *very* sure we clear this after we stop processing
self.transmission_loop_running = False
+ async def _catch_up_transmission_loop(self) -> None:
+ first_catch_up_check = self._last_successful_stream_ordering is None
+
+ if first_catch_up_check:
+ # first catchup so get last_successful_stream_ordering from database
+ self._last_successful_stream_ordering = await self._store.get_destination_last_successful_stream_ordering(
+ self._destination
+ )
+
+ if self._last_successful_stream_ordering is None:
+ # if it's still None, then this means we don't have the information
+ # in our database we haven't successfully sent a PDU to this server
+ # (at least since the introduction of the feature tracking
+ # last_successful_stream_ordering).
+ # Sadly, this means we can't do anything here as we don't know what
+ # needs catching up — so catching up is futile; let's stop.
+ self._catching_up = False
+ return
+
+ # get at most 50 catchup room/PDUs
+ while True:
+ event_ids = await self._store.get_catch_up_room_event_ids(
+ self._destination, self._last_successful_stream_ordering,
+ )
+
+ if not event_ids:
+ # No more events to catch up on, but we can't ignore the chance
+ # of a race condition, so we check that no new events have been
+ # skipped due to us being in catch-up mode
+
+ if self._catchup_last_skipped > self._last_successful_stream_ordering:
+ # another event has been skipped because we were in catch-up mode
+ continue
+
+ # we are done catching up!
+ self._catching_up = False
+ break
+
+ if first_catch_up_check:
+ # as this is our check for needing catch-up, we may have PDUs in
+ # the queue from before we *knew* we had to do catch-up, so
+ # clear those out now.
+ self._start_catching_up()
+
+ # fetch the relevant events from the event store
+ # - redacted behaviour of REDACT is fine, since we only send metadata
+ # of redacted events to the destination.
+ # - don't need to worry about rejected events as we do not actively
+ # forward received events over federation.
+ catchup_pdus = await self._store.get_events_as_list(event_ids)
+ if not catchup_pdus:
+ raise AssertionError(
+ "No events retrieved when we asked for %r. "
+ "This should not happen." % event_ids
+ )
+
+ if logger.isEnabledFor(logging.INFO):
+ rooms = (p.room_id for p in catchup_pdus)
+ logger.info("Catching up rooms to %s: %r", self._destination, rooms)
+
+ success = await self._transaction_manager.send_new_transaction(
+ self._destination, catchup_pdus, []
+ )
+
+ if not success:
+ return
+
+ sent_transactions_counter.inc()
+ final_pdu = catchup_pdus[-1]
+ self._last_successful_stream_ordering = cast(
+ int, final_pdu.internal_metadata.stream_ordering
+ )
+ await self._store.set_destination_last_successful_stream_ordering(
+ self._destination, self._last_successful_stream_ordering
+ )
+
def _get_rr_edus(self, force_flush: bool) -> Iterable[Edu]:
if not self._pending_rrs:
return
@@ -457,3 +569,12 @@ class PerDestinationQueue:
]
return (edus, stream_id)
+
+ def _start_catching_up(self) -> None:
+ """
+ Marks this destination as being in catch-up mode.
+
+ This throws away the PDU queue.
+ """
+ self._catching_up = True
+ self._pending_pdus = []
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index c0a958252e..091367006e 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -15,7 +15,7 @@
import logging
from collections import namedtuple
-from typing import Iterable, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
from canonicaljson import encode_canonical_json
@@ -371,3 +371,44 @@ class TransactionStore(SQLBaseStore):
values={"last_successful_stream_ordering": last_successful_stream_ordering},
desc="set_last_successful_stream_ordering",
)
+
+ async def get_catch_up_room_event_ids(
+ self, destination: str, last_successful_stream_ordering: int,
+ ) -> List[str]:
+ """
+ Returns at most 50 event IDs and their corresponding stream_orderings
+ that correspond to the oldest events that have not yet been sent to
+ the destination.
+
+ Args:
+ destination: the destination in question
+ last_successful_stream_ordering: the stream_ordering of the
+ most-recently successfully-transmitted event to the destination
+
+ Returns:
+ list of event_ids
+ """
+ return await self.db_pool.runInteraction(
+ "get_catch_up_room_event_ids",
+ self._get_catch_up_room_event_ids_txn,
+ destination,
+ last_successful_stream_ordering,
+ )
+
+ @staticmethod
+ def _get_catch_up_room_event_ids_txn(
+ txn, destination: str, last_successful_stream_ordering: int,
+ ) -> List[str]:
+ q = """
+ SELECT event_id FROM destination_rooms
+ JOIN events USING (stream_ordering)
+ WHERE destination = ?
+ AND stream_ordering > ?
+ ORDER BY stream_ordering
+ LIMIT 50
+ """
+ txn.execute(
+ q, (destination, last_successful_stream_ordering),
+ )
+ event_ids = [row[0] for row in txn]
+ return event_ids
diff --git a/tests/federation/test_federation_catch_up.py b/tests/federation/test_federation_catch_up.py
index 6cdcc378f0..cc52c3dfac 100644
--- a/tests/federation/test_federation_catch_up.py
+++ b/tests/federation/test_federation_catch_up.py
@@ -1,5 +1,10 @@
+from typing import List, Tuple
+
from mock import Mock
+from synapse.events import EventBase
+from synapse.federation.sender import PerDestinationQueue, TransactionManager
+from synapse.federation.units import Edu
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
@@ -156,3 +161,163 @@ class FederationCatchUpTestCases(FederatingHomeserverTestCase):
row_2["stream_ordering"],
"Send succeeded but not marked as last_successful_stream_ordering",
)
+
+ @override_config({"send_federation": True}) # critical to federate
+ def test_catch_up_from_blank_state(self):
+ """
+ Runs an overall test of federation catch-up from scratch.
+ Further tests will focus on more narrow aspects and edge-cases, but I
+ hope to provide an overall view with this test.
+ """
+ # bring the other server online
+ self.is_online = True
+
+ # let's make some events for the other server to receive
+ self.register_user("u1", "you the one")
+ u1_token = self.login("u1", "you the one")
+ room_1 = self.helper.create_room_as("u1", tok=u1_token)
+ room_2 = self.helper.create_room_as("u1", tok=u1_token)
+
+ # also critical to federate
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+ )
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+ )
+
+ self.helper.send_state(
+ room_1, event_type="m.room.topic", body={"topic": "wombat"}, tok=u1_token
+ )
+
+ # check: PDU received for topic event
+ self.assertEqual(len(self.pdus), 1)
+ self.assertEqual(self.pdus[0]["type"], "m.room.topic")
+
+ # take the remote offline
+ self.is_online = False
+
+ # send another event
+ self.helper.send(room_1, "hi user!", tok=u1_token)
+
+ # check: things didn't go well since the remote is down
+ self.assertEqual(len(self.failed_pdus), 1)
+ self.assertEqual(self.failed_pdus[0]["content"]["body"], "hi user!")
+
+ # let's delete the federation transmission queue
+ # (this pretends we are starting up fresh.)
+ self.assertFalse(
+ self.hs.get_federation_sender()
+ ._per_destination_queues["host2"]
+ .transmission_loop_running
+ )
+ del self.hs.get_federation_sender()._per_destination_queues["host2"]
+
+ # let's also clear any backoffs
+ self.get_success(
+ self.hs.get_datastore().set_destination_retry_timings("host2", None, 0, 0)
+ )
+
+ # bring the remote online and clear the received pdu list
+ self.is_online = True
+ self.pdus = []
+
+ # now we need to initiate a federation transaction somehow…
+ # to do that, let's send another event (because it's simple to do)
+ # (do it to another room otherwise the catch-up logic decides it doesn't
+ # need to catch up room_1 — something I overlooked when first writing
+ # this test)
+ self.helper.send(room_2, "wombats!", tok=u1_token)
+
+ # we should now have received both PDUs
+ self.assertEqual(len(self.pdus), 2)
+ self.assertEqual(self.pdus[0]["content"]["body"], "hi user!")
+ self.assertEqual(self.pdus[1]["content"]["body"], "wombats!")
+
+ def make_fake_destination_queue(
+ self, destination: str = "host2"
+ ) -> Tuple[PerDestinationQueue, List[EventBase]]:
+ """
+ Makes a fake per-destination queue.
+ """
+ transaction_manager = TransactionManager(self.hs)
+ per_dest_queue = PerDestinationQueue(self.hs, transaction_manager, destination)
+ results_list = []
+
+ async def fake_send(
+ destination_tm: str,
+ pending_pdus: List[EventBase],
+ _pending_edus: List[Edu],
+ ) -> bool:
+ assert destination == destination_tm
+ results_list.extend(pending_pdus)
+ return True # success!
+
+ transaction_manager.send_new_transaction = fake_send
+
+ return per_dest_queue, results_list
+
+ @override_config({"send_federation": True})
+ def test_catch_up_loop(self):
+ """
+ Tests the behaviour of _catch_up_transmission_loop.
+ """
+
+ # ARRANGE:
+ # - a local user (u1)
+ # - 3 rooms which u1 is joined to (and remote user @user:host2 is
+ # joined to)
+ # - some events (1 to 5) in those rooms
+ # we have 'already sent' events 1 and 2 to host2
+ per_dest_queue, sent_pdus = self.make_fake_destination_queue()
+
+ self.register_user("u1", "you the one")
+ u1_token = self.login("u1", "you the one")
+ room_1 = self.helper.create_room_as("u1", tok=u1_token)
+ room_2 = self.helper.create_room_as("u1", tok=u1_token)
+ room_3 = self.helper.create_room_as("u1", tok=u1_token)
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_1, "@user:host2", "join")
+ )
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_2, "@user:host2", "join")
+ )
+ self.get_success(
+ event_injection.inject_member_event(self.hs, room_3, "@user:host2", "join")
+ )
+
+ # create some events
+ self.helper.send(room_1, "you hear me!!", tok=u1_token)
+ event_id_2 = self.helper.send(room_2, "wombats!", tok=u1_token)["event_id"]
+ self.helper.send(room_3, "Matrix!", tok=u1_token)
+ event_id_4 = self.helper.send(room_2, "rabbits!", tok=u1_token)["event_id"]
+ event_id_5 = self.helper.send(room_3, "Synapse!", tok=u1_token)["event_id"]
+
+ # destination_rooms should already be populated, but let us pretend that we already
+ # sent (successfully) up to and including event id 2
+ event_2 = self.get_success(self.hs.get_datastore().get_event(event_id_2))
+
+ # also fetch event 5 so we know its last_successful_stream_ordering later
+ event_5 = self.get_success(self.hs.get_datastore().get_event(event_id_5))
+
+ self.get_success(
+ self.hs.get_datastore().set_destination_last_successful_stream_ordering(
+ "host2", event_2.internal_metadata.stream_ordering
+ )
+ )
+
+ # ACT
+ self.get_success(per_dest_queue._catch_up_transmission_loop())
+
+ # ASSERT, noticing in particular:
+ # - event 3 not sent out, because event 5 replaces it
+ # - order is least recent first, so event 5 comes after event 4
+ # - catch-up is completed
+ self.assertEqual(len(sent_pdus), 2)
+ self.assertEqual(sent_pdus[0].event_id, event_id_4)
+ self.assertEqual(sent_pdus[1].event_id, event_id_5)
+ self.assertFalse(per_dest_queue._catching_up)
+ self.assertEqual(
+ per_dest_queue._last_successful_stream_ordering,
+ event_5.internal_metadata.stream_ordering,
+ )
diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py
index f306a09bfa..3fec09ea8a 100644
--- a/tests/handlers/test_typing.py
+++ b/tests/handlers/test_typing.py
@@ -73,6 +73,7 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
"delivered_txn",
"get_received_txn_response",
"set_received_txn_response",
+ "get_destination_last_successful_stream_ordering",
"get_destination_retry_timings",
"get_devices_by_remote",
"maybe_store_room_on_invite",
@@ -121,6 +122,10 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase):
(0, [])
)
+ self.datastore.get_destination_last_successful_stream_ordering.return_value = make_awaitable(
+ None
+ )
+
def get_received_txn_response(*args):
return defer.succeed(None)
|