summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py228
1 files changed, 127 insertions, 101 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f15903f862..b0885dc979 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-
+import synapse.util.retryutils
 from twisted.internet import defer, reactor, protocol
 from twisted.internet.error import DNSLookupError
 from twisted.web.client import readBody, HTTPConnectionPool, Agent
@@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
             reactor, MatrixFederationEndpointFactory(hs), pool=pool
         )
         self.clock = hs.get_clock()
+        self._store = hs.get_datastore()
         self.version_string = hs.version_string
         self._next_id = 1
 
@@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object):
     def _request(self, destination, method, path,
                  body_callback, headers_dict={}, param_bytes=b"",
                  query_bytes=b"", retry_on_dns_fail=True,
-                 timeout=None, long_retries=False):
+                 timeout=None, long_retries=False, backoff_on_404=False):
         """ Creates and sends a request to the given server
         Args:
             destination (str): The remote server to send the HTTP request to.
             method (str): HTTP method
             path (str): The HTTP path
+            backoff_on_404 (bool): Back off if we get a 404
 
         Returns:
             Deferred: resolves with the http response object on success.
 
             Fails with ``HTTPRequestException``: if we get an HTTP response
-            code >= 300.
+                code >= 300.
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+                to retry this server.
         """
+        limiter = yield synapse.util.retryutils.get_retry_limiter(
+            destination,
+            self.clock,
+            self._store,
+            backoff_on_404=backoff_on_404,
+        )
+
         destination = destination.encode("ascii")
         path_bytes = path.encode("ascii")
+        with limiter:
+            headers_dict[b"User-Agent"] = [self.version_string]
+            headers_dict[b"Host"] = [destination]
 
-        headers_dict[b"User-Agent"] = [self.version_string]
-        headers_dict[b"Host"] = [destination]
+            url_bytes = self._create_url(
+                destination, path_bytes, param_bytes, query_bytes
+            )
 
-        url_bytes = self._create_url(
-            destination, path_bytes, param_bytes, query_bytes
-        )
+            txn_id = "%s-O-%s" % (method, self._next_id)
+            self._next_id = (self._next_id + 1) % (sys.maxint - 1)
 
-        txn_id = "%s-O-%s" % (method, self._next_id)
-        self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+            outbound_logger.info(
+                "{%s} [%s] Sending request: %s %s",
+                txn_id, destination, method, url_bytes
+            )
 
-        outbound_logger.info(
-            "{%s} [%s] Sending request: %s %s",
-            txn_id, destination, method, url_bytes
-        )
+            # XXX: Would be much nicer to retry only at the transaction-layer
+            # (once we have reliable transactions in place)
+            if long_retries:
+                retries_left = MAX_LONG_RETRIES
+            else:
+                retries_left = MAX_SHORT_RETRIES
 
-        # XXX: Would be much nicer to retry only at the transaction-layer
-        # (once we have reliable transactions in place)
-        if long_retries:
-            retries_left = MAX_LONG_RETRIES
-        else:
-            retries_left = MAX_SHORT_RETRIES
+            http_url_bytes = urlparse.urlunparse(
+                ("", "", path_bytes, param_bytes, query_bytes, "")
+            )
 
-        http_url_bytes = urlparse.urlunparse(
-            ("", "", path_bytes, param_bytes, query_bytes, "")
-        )
+            log_result = None
+            try:
+                while True:
+                    producer = None
+                    if body_callback:
+                        producer = body_callback(method, http_url_bytes, headers_dict)
+
+                    try:
+                        def send_request():
+                            request_deferred = preserve_context_over_fn(
+                                self.agent.request,
+                                method,
+                                url_bytes,
+                                Headers(headers_dict),
+                                producer
+                            )
+
+                            return self.clock.time_bound_deferred(
+                                request_deferred,
+                                time_out=timeout / 1000. if timeout else 60,
+                            )
+
+                        response = yield preserve_context_over_fn(send_request)
+
+                        log_result = "%d %s" % (response.code, response.phrase,)
+                        break
+                    except Exception as e:
+                        if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                            logger.warn(
+                                "DNS Lookup failed to %s with %s",
+                                destination,
+                                e
+                            )
+                            log_result = "DNS Lookup failed to %s with %s" % (
+                                destination, e
+                            )
+                            raise
 
-        log_result = None
-        try:
-            while True:
-                producer = None
-                if body_callback:
-                    producer = body_callback(method, http_url_bytes, headers_dict)
-
-                try:
-                    def send_request():
-                        request_deferred = preserve_context_over_fn(
-                            self.agent.request,
+                        logger.warn(
+                            "{%s} Sending request failed to %s: %s %s: %s - %s",
+                            txn_id,
+                            destination,
                             method,
                             url_bytes,
-                            Headers(headers_dict),
-                            producer
+                            type(e).__name__,
+                            _flatten_response_never_received(e),
                         )
 
-                        return self.clock.time_bound_deferred(
-                            request_deferred,
-                            time_out=timeout / 1000. if timeout else 60,
+                        log_result = "%s - %s" % (
+                            type(e).__name__, _flatten_response_never_received(e),
                         )
 
-                    response = yield preserve_context_over_fn(send_request)
-
-                    log_result = "%d %s" % (response.code, response.phrase,)
-                    break
-                except Exception as e:
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
-                        logger.warn(
-                            "DNS Lookup failed to %s with %s",
-                            destination,
-                            e
-                        )
-                        log_result = "DNS Lookup failed to %s with %s" % (
-                            destination, e
-                        )
-                        raise
-
-                    logger.warn(
-                        "{%s} Sending request failed to %s: %s %s: %s - %s",
-                        txn_id,
-                        destination,
-                        method,
-                        url_bytes,
-                        type(e).__name__,
-                        _flatten_response_never_received(e),
-                    )
-
-                    log_result = "%s - %s" % (
-                        type(e).__name__, _flatten_response_never_received(e),
-                    )
-
-                    if retries_left and not timeout:
-                        if long_retries:
-                            delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
-                            delay = min(delay, 60)
-                            delay *= random.uniform(0.8, 1.4)
+                        if retries_left and not timeout:
+                            if long_retries:
+                                delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+                                delay = min(delay, 60)
+                                delay *= random.uniform(0.8, 1.4)
+                            else:
+                                delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+                                delay = min(delay, 2)
+                                delay *= random.uniform(0.8, 1.4)
+
+                            yield sleep(delay)
+                            retries_left -= 1
                         else:
-                            delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
-                            delay = min(delay, 2)
-                            delay *= random.uniform(0.8, 1.4)
-
-                        yield sleep(delay)
-                        retries_left -= 1
-                    else:
-                        raise
-        finally:
-            outbound_logger.info(
-                "{%s} [%s] Result: %s",
-                txn_id,
-                destination,
-                log_result,
-            )
+                            raise
+            finally:
+                outbound_logger.info(
+                    "{%s} [%s] Result: %s",
+                    txn_id,
+                    destination,
+                    log_result,
+                )
 
-        if 200 <= response.code < 300:
-            pass
-        else:
-            # :'(
-            # Update transactions table?
-            body = yield preserve_context_over_fn(readBody, response)
-            raise HttpResponseException(
-                response.code, response.phrase, body
-            )
+            if 200 <= response.code < 300:
+                pass
+            else:
+                # :'(
+                # Update transactions table?
+                body = yield preserve_context_over_fn(readBody, response)
+                raise HttpResponseException(
+                    response.code, response.phrase, body
+                )
 
-        defer.returnValue(response)
+            defer.returnValue(response)
 
     def sign_request(self, destination, method, url_bytes, headers_dict,
                      content=None):
@@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object):
 
     @defer.inlineCallbacks
     def put_json(self, destination, path, data={}, json_data_callback=None,
-                 long_retries=False, timeout=None):
+                 long_retries=False, timeout=None, backoff_on_404=False):
         """ Sends the specifed json data using PUT
 
         Args:
@@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object):
                 retry for a short or long time.
             timeout(int): How long to try (in ms) the destination for before
                 giving up. None indicates no timeout.
+            backoff_on_404 (bool): True if we should count a 404 response as
+                a failure of the server (and should therefore back off future
+                requests)
 
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         if not json_data_callback:
@@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object):
             headers_dict={"Content-Type": ["application/json"]},
             long_retries=long_retries,
             timeout=timeout,
+            backoff_on_404=backoff_on_404,
         )
 
         if 200 <= response.code < 300:
@@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object):
             Deferred: Succeeds when we get a 2xx HTTP response. The result
             will be the decoded JSON body. On a 4xx or 5xx error response a
             CodeMessageException is raised.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         def body_callback(method, url_bytes, headers_dict):
@@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object):
 
             The result of the deferred is a tuple of `(code, response)`,
             where `response` is a dict representing the decoded JSON body.
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
         logger.debug("get_json args: %s", args)
 
@@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object):
 
             Fails with ``HTTPRequestException`` if we get an HTTP response code
             >= 300
+
+            Fails with ``NotRetryingDestination`` if we are not yet ready
+            to retry this server.
         """
 
         encoded_args = {}