diff options
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r-- | synapse/federation/sender/__init__.py | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 552519e82c..41a726878d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -209,7 +209,7 @@ class FederationSender: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -265,7 +265,7 @@ class FederationSender: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -280,6 +280,13 @@ class FederationSender: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, pdu.room_id, pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) |