diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index f15903f862..b0885dc979 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -12,8 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
-
+import synapse.util.retryutils
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, HTTPConnectionPool, Agent
@@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object):
reactor, MatrixFederationEndpointFactory(hs), pool=pool
)
self.clock = hs.get_clock()
+ self._store = hs.get_datastore()
self.version_string = hs.version_string
self._next_id = 1
@@ -106,133 +106,143 @@ class MatrixFederationHttpClient(object):
def _request(self, destination, method, path,
body_callback, headers_dict={}, param_bytes=b"",
query_bytes=b"", retry_on_dns_fail=True,
- timeout=None, long_retries=False):
+ timeout=None, long_retries=False, backoff_on_404=False):
""" Creates and sends a request to the given server
Args:
destination (str): The remote server to send the HTTP request to.
method (str): HTTP method
path (str): The HTTP path
+ backoff_on_404 (bool): Back off if we get a 404
Returns:
Deferred: resolves with the http response object on success.
Fails with ``HTTPRequestException``: if we get an HTTP response
- code >= 300.
+ code >= 300.
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
"""
+ limiter = yield synapse.util.retryutils.get_retry_limiter(
+ destination,
+ self.clock,
+ self._store,
+ backoff_on_404=backoff_on_404,
+ )
+
destination = destination.encode("ascii")
path_bytes = path.encode("ascii")
+ with limiter:
+ headers_dict[b"User-Agent"] = [self.version_string]
+ headers_dict[b"Host"] = [destination]
- headers_dict[b"User-Agent"] = [self.version_string]
- headers_dict[b"Host"] = [destination]
+ url_bytes = self._create_url(
+ destination, path_bytes, param_bytes, query_bytes
+ )
- url_bytes = self._create_url(
- destination, path_bytes, param_bytes, query_bytes
- )
+ txn_id = "%s-O-%s" % (method, self._next_id)
+ self._next_id = (self._next_id + 1) % (sys.maxint - 1)
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+ outbound_logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ txn_id, destination, method, url_bytes
+ )
- outbound_logger.info(
- "{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url_bytes
- )
+ # XXX: Would be much nicer to retry only at the transaction-layer
+ # (once we have reliable transactions in place)
+ if long_retries:
+ retries_left = MAX_LONG_RETRIES
+ else:
+ retries_left = MAX_SHORT_RETRIES
- # XXX: Would be much nicer to retry only at the transaction-layer
- # (once we have reliable transactions in place)
- if long_retries:
- retries_left = MAX_LONG_RETRIES
- else:
- retries_left = MAX_SHORT_RETRIES
+ http_url_bytes = urlparse.urlunparse(
+ ("", "", path_bytes, param_bytes, query_bytes, "")
+ )
- http_url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "")
- )
+ log_result = None
+ try:
+ while True:
+ producer = None
+ if body_callback:
+ producer = body_callback(method, http_url_bytes, headers_dict)
+
+ try:
+ def send_request():
+ request_deferred = preserve_context_over_fn(
+ self.agent.request,
+ method,
+ url_bytes,
+ Headers(headers_dict),
+ producer
+ )
+
+ return self.clock.time_bound_deferred(
+ request_deferred,
+ time_out=timeout / 1000. if timeout else 60,
+ )
+
+ response = yield preserve_context_over_fn(send_request)
+
+ log_result = "%d %s" % (response.code, response.phrase,)
+ break
+ except Exception as e:
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ logger.warn(
+ "DNS Lookup failed to %s with %s",
+ destination,
+ e
+ )
+ log_result = "DNS Lookup failed to %s with %s" % (
+ destination, e
+ )
+ raise
- log_result = None
- try:
- while True:
- producer = None
- if body_callback:
- producer = body_callback(method, http_url_bytes, headers_dict)
-
- try:
- def send_request():
- request_deferred = preserve_context_over_fn(
- self.agent.request,
+ logger.warn(
+ "{%s} Sending request failed to %s: %s %s: %s - %s",
+ txn_id,
+ destination,
method,
url_bytes,
- Headers(headers_dict),
- producer
+ type(e).__name__,
+ _flatten_response_never_received(e),
)
- return self.clock.time_bound_deferred(
- request_deferred,
- time_out=timeout / 1000. if timeout else 60,
+ log_result = "%s - %s" % (
+ type(e).__name__, _flatten_response_never_received(e),
)
- response = yield preserve_context_over_fn(send_request)
-
- log_result = "%d %s" % (response.code, response.phrase,)
- break
- except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
-
- logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s - %s",
- txn_id,
- destination,
- method,
- url_bytes,
- type(e).__name__,
- _flatten_response_never_received(e),
- )
-
- log_result = "%s - %s" % (
- type(e).__name__, _flatten_response_never_received(e),
- )
-
- if retries_left and not timeout:
- if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
+ if retries_left and not timeout:
+ if long_retries:
+ delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+ delay = min(delay, 60)
+ delay *= random.uniform(0.8, 1.4)
+ else:
+ delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+ delay = min(delay, 2)
+ delay *= random.uniform(0.8, 1.4)
+
+ yield sleep(delay)
+ retries_left -= 1
else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
-
- yield sleep(delay)
- retries_left -= 1
- else:
- raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
+ raise
+ finally:
+ outbound_logger.info(
+ "{%s} [%s] Result: %s",
+ txn_id,
+ destination,
+ log_result,
+ )
- if 200 <= response.code < 300:
- pass
- else:
- # :'(
- # Update transactions table?
- body = yield preserve_context_over_fn(readBody, response)
- raise HttpResponseException(
- response.code, response.phrase, body
- )
+ if 200 <= response.code < 300:
+ pass
+ else:
+ # :'(
+ # Update transactions table?
+ body = yield preserve_context_over_fn(readBody, response)
+ raise HttpResponseException(
+ response.code, response.phrase, body
+ )
- defer.returnValue(response)
+ defer.returnValue(response)
def sign_request(self, destination, method, url_bytes, headers_dict,
content=None):
@@ -261,7 +271,7 @@ class MatrixFederationHttpClient(object):
@defer.inlineCallbacks
def put_json(self, destination, path, data={}, json_data_callback=None,
- long_retries=False, timeout=None):
+ long_retries=False, timeout=None, backoff_on_404=False):
""" Sends the specifed json data using PUT
Args:
@@ -276,11 +286,17 @@ class MatrixFederationHttpClient(object):
retry for a short or long time.
timeout(int): How long to try (in ms) the destination for before
giving up. None indicates no timeout.
+ backoff_on_404 (bool): True if we should count a 404 response as
+ a failure of the server (and should therefore back off future
+ requests)
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
"""
if not json_data_callback:
@@ -303,6 +319,7 @@ class MatrixFederationHttpClient(object):
headers_dict={"Content-Type": ["application/json"]},
long_retries=long_retries,
timeout=timeout,
+ backoff_on_404=backoff_on_404,
)
if 200 <= response.code < 300:
@@ -332,6 +349,9 @@ class MatrixFederationHttpClient(object):
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body. On a 4xx or 5xx error response a
CodeMessageException is raised.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
"""
def body_callback(method, url_bytes, headers_dict):
@@ -377,6 +397,9 @@ class MatrixFederationHttpClient(object):
The result of the deferred is a tuple of `(code, response)`,
where `response` is a dict representing the decoded JSON body.
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
"""
logger.debug("get_json args: %s", args)
@@ -426,6 +449,9 @@ class MatrixFederationHttpClient(object):
Fails with ``HTTPRequestException`` if we get an HTTP response code
>= 300
+
+ Fails with ``NotRetryingDestination`` if we are not yet ready
+ to retry this server.
"""
encoded_args = {}
|