summary refs log tree commit diff
path: root/synapse/federation/sender
diff options
context:
space:
mode:
authorreivilibre <38398653+reivilibre@users.noreply.github.com>2020-09-04 12:22:23 +0100
committerGitHub <noreply@github.com>2020-09-04 12:22:23 +0100
commit58f61f10f780a5f9e6be99f4072c24442594d597 (patch)
tree29aa5ec644d9ed05490f7a24227a014b76cedc08 /synapse/federation/sender
parentFix type signature in simple_select_one_onecol and friends (#8241) (diff)
downloadsynapse-58f61f10f780a5f9e6be99f4072c24442594d597.tar.xz
Catch-up after Federation Outage (split, 1) (#8230)
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse/federation/sender')
-rw-r--r--synapse/federation/sender/__init__.py11
1 files changed, 9 insertions, 2 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 552519e82c..41a726878d 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -209,7 +209,7 @@ class FederationSender:
                     logger.debug("Sending %s to %r", event, destinations)
 
                     if destinations:
-                        self._send_pdu(event, destinations)
+                        await self._send_pdu(event, destinations)
 
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
@@ -265,7 +265,7 @@ class FederationSender:
         finally:
             self._is_processing = False
 
-    def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
         # We loop through all destinations to see whether we already have
         # a transaction in progress. If we do, stick it in the pending_pdus
         # table and we'll get back to it later.
@@ -280,6 +280,13 @@ class FederationSender:
         sent_pdus_destination_dist_total.inc(len(destinations))
         sent_pdus_destination_dist_count.inc()
 
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations, pdu.room_id, pdu.internal_metadata.stream_ordering,
+        )
+
         for destination in destinations:
             self._get_per_destination_queue(destination).send_pdu(pdu)