summary refs log tree commit diff
path: root/synapse/federation/federation_client.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-08-10 11:31:46 +0100
committerErik Johnston <erik@matrix.org>2016-08-10 11:31:46 +0100
commitf91df1f761b1e9e4da184560b0e7d9557129d064 (patch)
treeebeb4d2a3eac3376d7c18193819b26d6575de570 /synapse/federation/federation_client.py
parentMerge pull request #996 from matrix-org/erikj/tls_error (diff)
downloadsynapse-f91df1f761b1e9e4da184560b0e7d9557129d064.tar.xz
Store if we fail to fetch an event from a destination
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_client.py37
1 files changed, 36 insertions, 1 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index da95c2ad6d..baa672c4ac 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -51,10 +51,34 @@ sent_edus_counter = metrics.register_counter("sent_edus")
 sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
 
 
+PDU_RETRY_TIME_MS = 1 * 60 * 1000
+
+
 class FederationClient(FederationBase):
     def __init__(self, hs):
         super(FederationClient, self).__init__(hs)
 
+        self.pdu_destination_tried = {}
+        self._clock.looping_call(
+            self._clear_tried_cache, 60 * 1000,
+        )
+
+    def _clear_tried_cache(self):
+        """Clear pdu_destination_tried cache"""
+        now = self._clock.time_msec()
+
+        old_dict = self.pdu_destination_tried
+        self.pdu_destination_tried = {}
+
+        for event_id, destination_dict in old_dict.items():
+            destination_dict = {
+                dest: time
+                for dest, time in destination_dict.items()
+                if time + PDU_RETRY_TIME_MS > now
+            }
+            if destination_dict:
+                self.pdu_destination_tried[event_id] = destination_dict
+
     def start_get_pdu_cache(self):
         self._get_pdu_cache = ExpiringCache(
             cache_name="get_pdu_cache",
@@ -240,8 +264,15 @@ class FederationClient(FederationBase):
             if ev:
                 defer.returnValue(ev)
 
+        pdu_attempts = self.pdu_destination_tried.setdefault(event_id, {})
+
         pdu = None
         for destination in destinations:
+            now = self._clock.time_msec()
+            last_attempt = pdu_attempts.get(destination, 0)
+            if last_attempt + PDU_RETRY_TIME_MS > now:
+                continue
+
             try:
                 limiter = yield get_retry_limiter(
                     destination,
@@ -276,9 +307,11 @@ class FederationClient(FederationBase):
                 )
                 continue
             except CodeMessageException as e:
-                if 400 <= e.code < 500:
+                if 400 <= e.code < 500 and e.code != 404:
                     raise
 
+                pdu_attempts[destination] = now
+
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
                     event_id, destination, e,
@@ -288,6 +321,8 @@ class FederationClient(FederationBase):
                 logger.info(e.message)
                 continue
             except Exception as e:
+                pdu_attempts[destination] = now
+
                 logger.info(
                     "Failed to get PDU %s from %s because %s",
                     event_id, destination, e,