summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-10 11:31:46 +0100
committerErik Johnston <erik@matrix.org>2016-08-10 11:31:46 +0100
commitf91df1f761b1e9e4da184560b0e7d9557129d064 (patch)
treeebeb4d2a3eac3376d7c18193819b26d6575de570 /synapse/federation/federation_client.py
parentMerge pull request #996 from matrix-org/erikj/tls_error (diff)
downloadsynapse-f91df1f761b1e9e4da184560b0e7d9557129d064.tar.xz
Store if we fail to fetch an event from a destination
Diffstat (limited to 'synapse/federation/federation_client.py')
-rw-r--r--synapse/federation/federation_client.py37
1 files changed, 36 insertions, 1 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py

index da95c2ad6d..baa672c4ac 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py
@@ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus") sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +PDU_RETRY_TIME_MS = 1 * 60 * 1000 + + class FederationClient(FederationBase): def __init__(self, hs): super(FederationClient, self).__init__(hs) + self.pdu_destination_tried = {} + self._clock.looping_call( + self._clear_tried_cache, 60 * 1000, + ) + + def _clear_tried_cache(self): + """Clear pdu_destination_tried cache""" + now = self._clock.time_msec() + + old_dict = self.pdu_destination_tried + self.pdu_destination_tried = {} + + for event_id, destination_dict in old_dict.items(): + destination_dict = { + dest: time + for dest, time in destination_dict.items() + if time + PDU_RETRY_TIME_MS > now + } + if destination_dict: + self.pdu_destination_tried[event_id] = destination_dict + def start_get_pdu_cache(self): self._get_pdu_cache = ExpiringCache( cache_name="get_pdu_cache", @@ -240,8 +264,15 @@ class FederationClient(FederationBase): if ev: defer.returnValue(ev) + pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {}) + pdu = None for destination in destinations: + now = self._clock.time_msec() + last_attempt = pdu_attempts.get(destination, 0) + if last_attempt + PDU_RETRY_TIME_MS > now: + continue + try: limiter = yield get_retry_limiter( destination, @@ -276,9 +307,11 @@ class FederationClient(FederationBase): ) continue except CodeMessageException as e: - if 400 <= e.code < 500: + if 400 <= e.code < 500 and e.code != 404: raise + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e, @@ -288,6 +321,8 @@ class FederationClient(FederationBase): logger.info(e.message) continue except Exception as e: + pdu_attempts[destination] = now + logger.info( "Failed to get PDU %s from %s because %s", event_id, destination, e,