diff options
author | reivilibre <38398653+reivilibre@users.noreply.github.com> | 2020-09-04 12:22:23 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-09-04 12:22:23 +0100 |
commit | 58f61f10f780a5f9e6be99f4072c24442594d597 (patch) | |
tree | 29aa5ec644d9ed05490f7a24227a014b76cedc08 /synapse/federation | |
parent | Fix type signature in simple_select_one_onecol and friends (#8241) (diff) | |
download | synapse-58f61f10f780a5f9e6be99f4072c24442594d597.tar.xz |
Catch-up after Federation Outage (split, 1) (#8230)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/sender/__init__.py | 11 |
1 files changed, 9 insertions, 2 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 552519e82c..41a726878d 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -209,7 +209,7 @@ class FederationSender: logger.debug("Sending %s to %r", event, destinations) if destinations: - self._send_pdu(event, destinations) + await self._send_pdu(event, destinations) now = self.clock.time_msec() ts = await self.store.get_received_ts(event.event_id) @@ -265,7 +265,7 @@ class FederationSender: finally: self._is_processing = False - def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: + async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None: # We loop through all destinations to see whether we already have # a transaction in progress. If we do, stick it in the pending_pdus # table and we'll get back to it later. @@ -280,6 +280,13 @@ class FederationSender: sent_pdus_destination_dist_total.inc(len(destinations)) sent_pdus_destination_dist_count.inc() + # track the fact that we have a PDU for these destinations, + # to allow us to perform catch-up later on if the remote is unreachable + # for a while. + await self.store.store_destination_rooms_entries( + destinations, pdu.room_id, pdu.internal_metadata.stream_ordering, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) |