diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/http/matrixfederationclient.py | 118 |
1 files changed, 97 insertions, 21 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1dda3ba2c7..7db001cc63 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -20,18 +20,19 @@ from twisted.web.client import readBody, _AgentBase, _URI from twisted.web.http_headers import Headers from twisted.web._newclient import ResponseDone -from synapse.http.agent_name import AGENT_NAME from synapse.http.endpoint import matrix_federation_endpoint from synapse.util.async import sleep from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json -from synapse.api.errors import CodeMessageException, SynapseError, Codes +from synapse.api.errors import ( + SynapseError, Codes, HttpResponseException, +) from syutil.crypto.jsonsign import sign_json -import json +import simplejson as json import logging import urllib import urlparse @@ -77,6 +78,8 @@ class MatrixFederationHttpClient(object): self.signing_key = hs.config.signing_key[0] self.server_name = hs.hostname self.agent = MatrixFederationHttpAgent(reactor) + self.clock = hs.get_clock() + self.version_string = hs.version_string @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, @@ -84,7 +87,7 @@ class MatrixFederationHttpClient(object): query_bytes=b"", retry_on_dns_fail=True): """ Creates and sends a request to the given url """ - headers_dict[b"User-Agent"] = [AGENT_NAME] + headers_dict[b"User-Agent"] = [self.version_string] headers_dict[b"Host"] = [destination] url_bytes = urlparse.urlunparse( @@ -116,7 +119,7 @@ class MatrixFederationHttpClient(object): try: with PreserveLoggingContext(): - response = yield self.agent.request( + request_deferred = self.agent.request( destination, endpoint, method, @@ -127,6 +130,11 @@ class MatrixFederationHttpClient(object): producer ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=60, + ) + logger.debug("Got response to %s", method) break except Exception as e: @@ -136,16 +144,16 @@ class MatrixFederationHttpClient(object): destination, e ) - raise SynapseError(400, "Domain specified not found.") + raise logger.warn( - "Sending request failed to %s: %s %s : %s", + "Sending request failed to %s: %s %s: %s - %s", destination, method, url_bytes, - e + type(e).__name__, + _flatten_response_never_received(e), ) - _print_ex(e) if retries_left: yield sleep(2 ** (5 - retries_left)) @@ -163,13 +171,13 @@ class MatrixFederationHttpClient(object): ) if 200 <= response.code < 300: - # We need to update the transactions table to say it was sent? pass else: # :'( # Update transactions table? - raise CodeMessageException( - response.code, response.phrase + body = yield readBody(response) + raise HttpResponseException( + response.code, response.phrase, body ) defer.returnValue(response) @@ -238,11 +246,66 @@ class MatrixFederationHttpClient(object): headers_dict={"Content-Type": ["application/json"]}, ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + logger.debug("Getting resp body") body = yield readBody(response) logger.debug("Got resp body") - defer.returnValue((response.code, body)) + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks + def post_json(self, destination, path, data={}): + """ Sends the specifed json data using POST + + Args: + destination (str): The remote server to send the HTTP request + to. + path (str): The HTTP path. + data (dict): A dict containing the data that will be used as + the request body. This will be encoded as JSON. + + Returns: + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. On a 4xx or 5xx error response a + CodeMessageException is raised. + """ + + def body_callback(method, url_bytes, headers_dict): + self.sign_request( + destination, method, url_bytes, headers_dict, data + ) + return _JsonProducer(data) + + response = yield self._create_request( + destination.encode("ascii"), + "POST", + path.encode("ascii"), + body_callback=body_callback, + headers_dict={"Content-Type": ["application/json"]}, + ) + + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") + body = yield readBody(response) + logger.debug("Got resp body") + + defer.returnValue(json.loads(body)) @defer.inlineCallbacks def get_json(self, destination, path, args={}, retry_on_dns_fail=True): @@ -284,7 +347,18 @@ class MatrixFederationHttpClient(object): retry_on_dns_fail=retry_on_dns_fail ) + if 200 <= response.code < 300: + # We need to update the transactions table to say it was sent? + c_type = response.headers.getRawHeaders("Content-Type") + + if "application/json" not in c_type: + raise RuntimeError( + "Content-Type not application/json" + ) + + logger.debug("Getting resp body") body = yield readBody(response) + logger.debug("Got resp body") defer.returnValue(json.loads(body)) @@ -373,14 +447,6 @@ def _readBodyToFile(response, stream, max_size): return d -def _print_ex(e): - if hasattr(e, "reasons") and e.reasons: - for ex in e.reasons: - _print_ex(ex) - else: - logger.warn(e) - - class _JsonProducer(object): """ Used by the twisted http client to create the HTTP body from json """ @@ -400,3 +466,13 @@ class _JsonProducer(object): def stopProducing(self): pass + + +def _flatten_response_never_received(e): + if hasattr(e, "reasons"): + return ", ".join( + _flatten_response_never_received(f.value) + for f in e.reasons + ) + else: + return "%s: %s" % (type(e).__name__, e.message,) |