From 58f61f10f780a5f9e6be99f4072c24442594d597 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Fri, 4 Sep 2020 12:22:23 +0100 Subject: Catch-up after Federation Outage (split, 1) (#8230) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/federation/sender/__init__.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/federation/sender') diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 552519e82c..41a726878d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -209,7 +209,7 @@ class FederationSender: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -265,7 +265,7 @@ class FederationSender: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -280,6 +280,13 @@ class FederationSender: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, pdu.room_id, pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) -- cgit 1.4.1