diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2024-03-22 13:24:11 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-03-22 13:24:11 +0000 |
commit | b5322b4daf2e13310200e57eb427568cb6a92ddf (patch) | |
tree | 8a8aed36f136df07d0fea011581bc4cadba821af /synapse/federation/sender/__init__.py | |
parent | Add OIDC config to add extra parameters to the authorize URL (#16971) (diff) | |
download | synapse-b5322b4daf2e13310200e57eb427568cb6a92ddf.tar.xz |
Ensure that pending to-device events are sent over federation at startup (#16925)
Fixes https://github.com/element-hq/synapse/issues/16680, as well as a related bug, where servers which we had *never* successfully sent an event to would not be retried. In order to fix the case of pending to-device messages, we hook into the existing `wake_destinations_needing_catchup` process, by extending it to look for destinations that have pending to-device messages. The federation transmission loop then attempts to send the pending to-device messages as normal.
Diffstat (limited to 'synapse/federation/sender/__init__.py')
-rw-r--r-- | synapse/federation/sender/__init__.py | 14 |
1 files changed, 6 insertions, 8 deletions
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 9ed6fc98b5..1888480881 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -192,10 +192,9 @@ sent_pdus_destination_dist_total = Counter( ) # Time (in s) to wait before trying to wake up destinations that have -# catch-up outstanding. This will also be the delay applied at startup -# before trying the same. +# catch-up outstanding. # Please note that rate limiting still applies, so while the loop is -# executed every X seconds the destinations may not be wake up because +# executed every X seconds the destinations may not be woken up because # they are being rate limited following previous attempt failures. WAKEUP_RETRY_PERIOD_SEC = 60 @@ -428,18 +427,17 @@ class FederationSender(AbstractFederationSender): / hs.config.ratelimiting.federation_rr_transactions_per_room_per_second ) + self._external_cache = hs.get_external_cache() + self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) + # Regularly wake up destinations that have outstanding PDUs to be caught up - self.clock.looping_call( + self.clock.looping_call_now( run_as_background_process, WAKEUP_RETRY_PERIOD_SEC * 1000.0, "wake_destinations_needing_catchup", self._wake_destinations_needing_catchup, ) - self._external_cache = hs.get_external_cache() - - self._destination_wakeup_queue = _DestinationWakeupQueue(self, self.clock) - def _get_per_destination_queue(self, destination: str) -> PerDestinationQueue: """Get or create a PerDestinationQueue for the given destination |