From 8529fba02d93ed1d0d08873f0cbbd58a3194e4af Mon Sep 17 00:00:00 2001 From: Matthew Hodgson Date: Mon, 8 Dec 2014 19:34:51 +0000 Subject: fix a million stupid bugs and make it actually work --- synapse/federation/replication.py | 25 +++++++++++++++++-------- 1 file changed, 17 insertions(+), 8 deletions(-) (limited to 'synapse/federation/replication.py') diff --git a/synapse/federation/replication.py b/synapse/federation/replication.py index 88184caecd..c4c6667b62 100644 --- a/synapse/federation/replication.py +++ b/synapse/federation/replication.py @@ -685,6 +685,7 @@ class _TransactionQueue(object): self.transport_layer = transport_layer self._clock = hs.get_clock() + self.store = hs.get_datastore() # Is a mapping from destinations -> deferreds. Used to keep track # of which destinations have transactions in flight and when they are @@ -775,11 +776,18 @@ class _TransactionQueue(object): @defer.inlineCallbacks @log_function def _attempt_new_transaction(self, destination): - - (retry_last_ts, retry_interval) = self.store.get_destination_retry_timings(destination) - if retry_last_ts + retry_interval > int(self._clock.time_msec()): - logger.info("TX [%s] not ready for retry yet - dropping transaction for now") - return + + (retry_last_ts, retry_interval) = (0, 0) + retry_timings = yield self.store.get_destination_retry_timings(destination) + if retry_timings: + (retry_last_ts, retry_interval) = ( + retry_timings.retry_last_ts, retry_timings.retry_interval + ) + if retry_last_ts + retry_interval > int(self._clock.time_msec()): + logger.info("TX [%s] not ready for retry yet - dropping transaction for now", destination) + return + else: + logger.info("TX [%s] is ready for retry", destination) if destination in self.pending_transactions: # XXX: pending_transactions can get stuck on by a never-ending request @@ -866,7 +874,7 @@ class _TransactionQueue(object): if code == 200: if retry_last_ts: # this host is alive! reset retry schedule - self.store.set_destination_retry_timings(destination, 0, 0) + yield self.store.set_destination_retry_timings(destination, 0, 0) deferred.callback(None) else: self.start_retrying(destination, retry_interval) @@ -899,12 +907,13 @@ class _TransactionQueue(object): # Check to see if there is anything else to send. self._attempt_new_transaction(destination) + @defer.inlineCallbacks def start_retrying(self, destination, retry_interval): # track that this destination is having problems and we should # give it a chance to recover before trying it again if retry_interval: retry_interval *= 2 else: - retry_interval = 2 # try again at first after 2 seconds - self.store.set_destination_retry_timings(destination, + retry_interval = 2000 # try again at first after 2 seconds + yield self.store.set_destination_retry_timings(destination, int(self._clock.time_msec()), retry_interval) -- cgit 1.4.1