diff options
author | Erik Johnston <erik@matrix.org> | 2017-04-11 11:12:37 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-04-11 11:12:37 +0100 |
commit | 4902db1fc978d1c9a3681720cffc4dbb9d72dbea (patch) | |
tree | ee78b86f62119a7605ab2fdd1bc8afd3953e7978 /synapse/http/matrixfederationclient.py | |
parent | Merge branch 'release-v0.19.3' of github.com:matrix-org/synapse (diff) | |
parent | Bump changelog (diff) | |
download | synapse-4902db1fc978d1c9a3681720cffc4dbb9d72dbea.tar.xz |
Merge branch 'release-v0.20.0' of github.com:matrix-org/synapse v0.20.0
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 327 |
1 files changed, 196 insertions, 131 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 78b92cef36..62b4d7e93d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - - +import synapse.util.retryutils from twisted.internet import defer, reactor, protocol from twisted.internet.error import DNSLookupError from twisted.web.client import readBody, HTTPConnectionPool, Agent @@ -22,7 +21,7 @@ from twisted.web._newclient import ResponseDone from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep -from synapse.util.logcontext import preserve_context_over_fn +from synapse.util import logcontext import synapse.metrics from canonicaljson import encode_canonical_json @@ -94,6 +93,7 @@ class MatrixFederationHttpClient(object): reactor, MatrixFederationEndpointFactory(hs), pool=pool ) self.clock = hs.get_clock() + self._store = hs.get_datastore() self.version_string = hs.version_string self._next_id = 1 @@ -103,123 +103,152 @@ class MatrixFederationHttpClient(object): ) @defer.inlineCallbacks - def _create_request(self, destination, method, path_bytes, - body_callback, headers_dict={}, param_bytes=b"", - query_bytes=b"", retry_on_dns_fail=True, - timeout=None, long_retries=False): - """ Creates and sends a request to the given url - """ - headers_dict[b"User-Agent"] = [self.version_string] - headers_dict[b"Host"] = [destination] + def _request(self, destination, method, path, + body_callback, headers_dict={}, param_bytes=b"", + query_bytes=b"", retry_on_dns_fail=True, + timeout=None, long_retries=False, + ignore_backoff=False, + backoff_on_404=False): + """ Creates and sends a request to the given server + Args: + destination (str): The remote server to send the HTTP request to. + method (str): HTTP method + path (str): The HTTP path + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. + backoff_on_404 (bool): Back off if we get a 404 - url_bytes = self._create_url( - destination, path_bytes, param_bytes, query_bytes + Returns: + Deferred: resolves with the http response object on success. + + Fails with ``HTTPRequestException``: if we get an HTTP response + code >= 300. + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. + """ + limiter = yield synapse.util.retryutils.get_retry_limiter( + destination, + self.clock, + self._store, + backoff_on_404=backoff_on_404, + ignore_backoff=ignore_backoff, ) - txn_id = "%s-O-%s" % (method, self._next_id) - self._next_id = (self._next_id + 1) % (sys.maxint - 1) + destination = destination.encode("ascii") + path_bytes = path.encode("ascii") + with limiter: + headers_dict[b"User-Agent"] = [self.version_string] + headers_dict[b"Host"] = [destination] - outbound_logger.info( - "{%s} [%s] Sending request: %s %s", - txn_id, destination, method, url_bytes - ) + url_bytes = self._create_url( + destination, path_bytes, param_bytes, query_bytes + ) - # XXX: Would be much nicer to retry only at the transaction-layer - # (once we have reliable transactions in place) - if long_retries: - retries_left = MAX_LONG_RETRIES - else: - retries_left = MAX_SHORT_RETRIES + txn_id = "%s-O-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) - http_url_bytes = urlparse.urlunparse( - ("", "", path_bytes, param_bytes, query_bytes, "") - ) + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes + ) - log_result = None - try: - while True: - producer = None - if body_callback: - producer = body_callback(method, http_url_bytes, headers_dict) - - try: - def send_request(): - request_deferred = preserve_context_over_fn( - self.agent.request, - method, - url_bytes, - Headers(headers_dict), - producer - ) + # XXX: Would be much nicer to retry only at the transaction-layer + # (once we have reliable transactions in place) + if long_retries: + retries_left = MAX_LONG_RETRIES + else: + retries_left = MAX_SHORT_RETRIES - return self.clock.time_bound_deferred( - request_deferred, - time_out=timeout / 1000. if timeout else 60, - ) + http_url_bytes = urlparse.urlunparse( + ("", "", path_bytes, param_bytes, query_bytes, "") + ) - response = yield preserve_context_over_fn(send_request) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, http_url_bytes, headers_dict) + + try: + def send_request(): + request_deferred = self.agent.request( + method, + url_bytes, + Headers(headers_dict), + producer + ) + + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout / 1000. if timeout else 60, + ) + + with logcontext.PreserveLoggingContext(): + response = yield send_request() + + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - log_result = "%d %s" % (response.code, response.phrase,) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "DNS Lookup failed to %s with %s", + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, destination, - e + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), ) - log_result = "DNS Lookup failed to %s with %s" % ( - destination, e + + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), ) - raise - - logger.warn( - "{%s} Sending request failed to %s: %s %s: %s - %s", - txn_id, - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - - log_result = "%s - %s" % ( - type(e).__name__, _flatten_response_never_received(e), - ) - - if retries_left and not timeout: - if long_retries: - delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) - delay = min(delay, 60) - delay *= random.uniform(0.8, 1.4) + + if retries_left and not timeout: + if long_retries: + delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left) + delay = min(delay, 60) + delay *= random.uniform(0.8, 1.4) + else: + delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) + delay = min(delay, 2) + delay *= random.uniform(0.8, 1.4) + + yield sleep(delay) + retries_left -= 1 else: - delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left) - delay = min(delay, 2) - delay *= random.uniform(0.8, 1.4) - - yield sleep(delay) - retries_left -= 1 - else: - raise - finally: - outbound_logger.info( - "{%s} [%s] Result: %s", - txn_id, - destination, - log_result, - ) + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) - if 200 <= response.code < 300: - pass - else: - # :'( - # Update transactions table? - body = yield preserve_context_over_fn(readBody, response) - raise HttpResponseException( - response.code, response.phrase, body - ) + if 200 <= response.code < 300: + pass + else: + # :'( + # Update transactions table? + with logcontext.PreserveLoggingContext(): + body = yield readBody(response) + raise HttpResponseException( + response.code, response.phrase, body + ) - defer.returnValue(response) + defer.returnValue(response) def sign_request(self, destination, method, url_bytes, headers_dict, content=None): @@ -248,7 +277,9 @@ class MatrixFederationHttpClient(object): @defer.inlineCallbacks def put_json(self, destination, path, data={}, json_data_callback=None, - long_retries=False, timeout=None): + long_retries=False, timeout=None, + ignore_backoff=False, + backoff_on_404=False): """ Sends the specifed json data using PUT Args: @@ -263,11 +294,19 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. + backoff_on_404 (bool): True if we should count a 404 response as + a failure of the server (and should therefore back off future + requests) Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ if not json_data_callback: @@ -282,26 +321,29 @@ class MatrixFederationHttpClient(object): producer = _JsonProducer(json_data) return producer - response = yield self._create_request( - destination.encode("ascii"), + response = yield self._request( + destination, "PUT", - path.encode("ascii"), + path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + ignore_backoff=ignore_backoff, + backoff_on_404=backoff_on_404, ) if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? check_content_type_is_json(response.headers) - body = yield preserve_context_over_fn(readBody, response) + with logcontext.PreserveLoggingContext(): + body = yield readBody(response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks def post_json(self, destination, path, data={}, long_retries=False, - timeout=None): + timeout=None, ignore_backoff=False): """ Sends the specifed json data using POST Args: @@ -314,11 +356,15 @@ class MatrixFederationHttpClient(object): retry for a short or long time. timeout(int): How long to try (in ms) the destination for before giving up. None indicates no timeout. - + ignore_backoff (bool): true to ignore the historical backoff data and + try the request anyway. Returns: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. On a 4xx or 5xx error response a CodeMessageException is raised. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ def body_callback(method, url_bytes, headers_dict): @@ -327,27 +373,29 @@ class MatrixFederationHttpClient(object): ) return _JsonProducer(data) - response = yield self._create_request( - destination.encode("ascii"), + response = yield self._request( + destination, "POST", - path.encode("ascii"), + path, body_callback=body_callback, headers_dict={"Content-Type": ["application/json"]}, long_retries=long_retries, timeout=timeout, + ignore_backoff=ignore_backoff, ) if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? check_content_type_is_json(response.headers) - body = yield preserve_context_over_fn(readBody, response) + with logcontext.PreserveLoggingContext(): + body = yield readBody(response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True, - timeout=None): + timeout=None, ignore_backoff=False): """ GETs some json from the given host homeserver and path Args: @@ -359,11 +407,16 @@ class MatrixFederationHttpClient(object): timeout (int): How long to try (in ms) the destination for before giving up. None indicates no timeout and that the request will be retried. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: Deferred: Succeeds when we get *any* HTTP response. The result of the deferred is a tuple of `(code, response)`, where `response` is a dict representing the decoded JSON body. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ logger.debug("get_json args: %s", args) @@ -380,36 +433,47 @@ class MatrixFederationHttpClient(object): self.sign_request(destination, method, url_bytes, headers_dict) return None - response = yield self._create_request( - destination.encode("ascii"), + response = yield self._request( + destination, "GET", - path.encode("ascii"), + path, query_bytes=query_bytes, body_callback=body_callback, retry_on_dns_fail=retry_on_dns_fail, timeout=timeout, + ignore_backoff=ignore_backoff, ) if 200 <= response.code < 300: # We need to update the transactions table to say it was sent? check_content_type_is_json(response.headers) - body = yield preserve_context_over_fn(readBody, response) + with logcontext.PreserveLoggingContext(): + body = yield readBody(response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_file(self, destination, path, output_stream, args={}, - retry_on_dns_fail=True, max_size=None): + retry_on_dns_fail=True, max_size=None, + ignore_backoff=False): """GETs a file from a given homeserver Args: destination (str): The remote server to send the HTTP request to. path (str): The HTTP path to GET. output_stream (file): File to write the response body to. args (dict): Optional dictionary used to create the query string. + ignore_backoff (bool): true to ignore the historical backoff data + and try the request anyway. Returns: - A (int,dict) tuple of the file length and a dict of the response - headers. + Deferred: resolves with an (int,dict) tuple of the file length and + a dict of the response headers. + + Fails with ``HTTPRequestException`` if we get an HTTP response code + >= 300 + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. """ encoded_args = {} @@ -419,28 +483,29 @@ class MatrixFederationHttpClient(object): encoded_args[k] = [v.encode("UTF-8") for v in vs] query_bytes = urllib.urlencode(encoded_args, True) - logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail) + logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail) def body_callback(method, url_bytes, headers_dict): self.sign_request(destination, method, url_bytes, headers_dict) return None - response = yield self._create_request( - destination.encode("ascii"), + response = yield self._request( + destination, "GET", - path.encode("ascii"), + path, query_bytes=query_bytes, body_callback=body_callback, - retry_on_dns_fail=retry_on_dns_fail + retry_on_dns_fail=retry_on_dns_fail, + ignore_backoff=ignore_backoff, ) headers = dict(response.headers.getAllRawHeaders()) try: - length = yield preserve_context_over_fn( - _readBodyToFile, - response, output_stream, max_size - ) + with logcontext.PreserveLoggingContext(): + length = yield _readBodyToFile( + response, output_stream, max_size + ) except: logger.exception("Failed to download body") raise |