diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index 58ef8d3ce4..a3f9e4f67c 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -38,12 +38,12 @@ def cancelled_to_request_timed_out_error(value, timeout):
return value
-ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
def redact_uri(uri):
"""Strips access tokens from the uri replaces with <redacted>"""
return ACCESS_TOKEN_RE.sub(
- br'\1<redacted>\3',
+ r'\1<redacted>\3',
uri
)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index ab4fbf59b2..3d05f83b8c 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -13,24 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+
import logging
-import urllib
-from six import StringIO
+from six import text_type
+from six.moves import urllib
+import treq
from canonicaljson import encode_canonical_json, json
from prometheus_client import Counter
from OpenSSL import SSL
from OpenSSL.SSL import VERIFY_NONE
-from twisted.internet import defer, protocol, reactor, ssl, task
+from twisted.internet import defer, protocol, reactor, ssl
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.web._newclient import ResponseDone
from twisted.web.client import (
Agent,
BrowserLikeRedirectAgent,
ContentDecoderAgent,
- FileBodyProducer as TwistedFileBodyProducer,
GzipDecoder,
HTTPConnectionPool,
PartialDownloadError,
@@ -42,7 +43,7 @@ from twisted.web.http_headers import Headers
from synapse.api.errors import Codes, HttpResponseException, SynapseError
from synapse.http import cancelled_to_request_timed_out_error, redact_uri
from synapse.http.endpoint import SpiderEndpoint
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
@@ -83,8 +84,10 @@ class SimpleHttpClient(object):
if hs.config.user_agent_suffix:
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix,)
+ self.user_agent = self.user_agent.encode('ascii')
+
@defer.inlineCallbacks
- def request(self, method, uri, *args, **kwargs):
+ def request(self, method, uri, data=b'', headers=None):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
outgoing_requests_counter.labels(method).inc()
@@ -93,10 +96,10 @@ class SimpleHttpClient(object):
logger.info("Sending request %s %s", method, redact_uri(uri))
try:
- request_deferred = self.agent.request(
- method, uri, *args, **kwargs
+ request_deferred = treq.request(
+ method, uri, agent=self.agent, data=data, headers=headers
)
- add_timeout_to_deferred(
+ request_deferred = timeout_deferred(
request_deferred, 60, self.hs.get_reactor(),
cancelled_to_request_timed_out_error,
)
@@ -112,7 +115,7 @@ class SimpleHttpClient(object):
incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
- method, redact_uri(uri), type(e).__name__, e.message
+ method, redact_uri(uri), type(e).__name__, e.args[0]
)
raise
@@ -137,7 +140,8 @@ class SimpleHttpClient(object):
# TODO: Do we ever want to log message contents?
logger.debug("post_urlencoded_get_json args: %s", args)
- query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+ query_bytes = urllib.parse.urlencode(
+ encode_urlencode_args(args), True).encode("utf8")
actual_headers = {
b"Content-Type": [b"application/x-www-form-urlencoded"],
@@ -148,15 +152,14 @@ class SimpleHttpClient(object):
response = yield self.request(
"POST",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(query_bytes))
+ data=query_bytes
)
- body = yield make_deferred_yieldable(readBody(response))
-
if 200 <= response.code < 300:
- defer.returnValue(json.loads(body))
+ body = yield make_deferred_yieldable(treq.json_content(response))
+ defer.returnValue(body)
else:
raise HttpResponseException(response.code, response.phrase, body)
@@ -191,9 +194,9 @@ class SimpleHttpClient(object):
response = yield self.request(
"POST",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(json_str))
+ data=json_str
)
body = yield make_deferred_yieldable(readBody(response))
@@ -248,7 +251,7 @@ class SimpleHttpClient(object):
ValueError: if the response was not JSON
"""
if len(args):
- query_bytes = urllib.urlencode(args, True)
+ query_bytes = urllib.parse.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
json_str = encode_canonical_json(json_body)
@@ -262,9 +265,9 @@ class SimpleHttpClient(object):
response = yield self.request(
"PUT",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
- bodyProducer=FileBodyProducer(StringIO(json_str))
+ data=json_str
)
body = yield make_deferred_yieldable(readBody(response))
@@ -293,7 +296,7 @@ class SimpleHttpClient(object):
HttpResponseException on a non-2xx HTTP response.
"""
if len(args):
- query_bytes = urllib.urlencode(args, True)
+ query_bytes = urllib.parse.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
actual_headers = {
@@ -304,7 +307,7 @@ class SimpleHttpClient(object):
response = yield self.request(
"GET",
- uri.encode("ascii"),
+ uri,
headers=Headers(actual_headers),
)
@@ -339,13 +342,14 @@ class SimpleHttpClient(object):
response = yield self.request(
"GET",
- url.encode("ascii"),
+ url,
headers=Headers(actual_headers),
)
resp_headers = dict(response.headers.getAllRawHeaders())
- if 'Content-Length' in resp_headers and resp_headers['Content-Length'] > max_size:
+ if (b'Content-Length' in resp_headers and
+ int(resp_headers[b'Content-Length']) > max_size):
logger.warn("Requested URL is too large > %r bytes" % (self.max_size,))
raise SynapseError(
502,
@@ -378,7 +382,12 @@ class SimpleHttpClient(object):
)
defer.returnValue(
- (length, resp_headers, response.request.absoluteURI, response.code),
+ (
+ length,
+ resp_headers,
+ response.request.absoluteURI.decode('ascii'),
+ response.code,
+ ),
)
@@ -434,12 +443,12 @@ class CaptchaServerHttpClient(SimpleHttpClient):
@defer.inlineCallbacks
def post_urlencoded_get_raw(self, url, args={}):
- query_bytes = urllib.urlencode(encode_urlencode_args(args), True)
+ query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True)
response = yield self.request(
"POST",
- url.encode("ascii"),
- bodyProducer=FileBodyProducer(StringIO(query_bytes)),
+ url,
+ data=query_bytes,
headers=Headers({
b"Content-Type": [b"application/x-www-form-urlencoded"],
b"User-Agent": [self.user_agent],
@@ -463,9 +472,9 @@ class SpiderEndpointFactory(object):
def endpointForURI(self, uri):
logger.info("Getting endpoint for %s", uri.toBytes())
- if uri.scheme == "http":
+ if uri.scheme == b"http":
endpoint_factory = HostnameEndpoint
- elif uri.scheme == "https":
+ elif uri.scheme == b"https":
tlsCreator = self.policyForHTTPS.creatorForNetloc(uri.host, uri.port)
def endpoint_factory(reactor, host, port, **kw):
@@ -510,7 +519,7 @@ def encode_urlencode_args(args):
def encode_urlencode_arg(arg):
- if isinstance(arg, unicode):
+ if isinstance(arg, text_type):
return arg.encode('utf-8')
elif isinstance(arg, list):
return [encode_urlencode_arg(i) for i in arg]
@@ -542,26 +551,3 @@ class InsecureInterceptableContextFactory(ssl.ContextFactory):
def creatorForNetloc(self, hostname, port):
return self
-
-
-class FileBodyProducer(TwistedFileBodyProducer):
- """Workaround for https://twistedmatrix.com/trac/ticket/8473
-
- We override the pauseProducing and resumeProducing methods in twisted's
- FileBodyProducer so that they do not raise exceptions if the task has
- already completed.
- """
-
- def pauseProducing(self):
- try:
- super(FileBodyProducer, self).pauseProducing()
- except task.TaskDone:
- # task has already completed
- pass
-
- def resumeProducing(self):
- try:
- super(FileBodyProducer, self).resumeProducing()
- except task.NotPaused:
- # task was not paused (probably because it had already completed)
- pass
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index b0c9369519..91025037a3 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -108,7 +108,7 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
Args:
reactor: Twisted reactor.
- destination (bytes): The name of the server to connect to.
+ destination (unicode): The name of the server to connect to.
tls_client_options_factory
(synapse.crypto.context_factory.ClientTLSOptionsFactory):
Factory which generates TLS options for client connections.
@@ -126,10 +126,17 @@ def matrix_federation_endpoint(reactor, destination, tls_client_options_factory=
transport_endpoint = HostnameEndpoint
default_port = 8008
else:
+ # the SNI string should be the same as the Host header, minus the port.
+ # as per https://github.com/matrix-org/synapse/issues/2525#issuecomment-336896777,
+ # the Host header and SNI should therefore be the server_name of the remote
+ # server.
+ tls_options = tls_client_options_factory.get_options(domain)
+
def transport_endpoint(reactor, host, port, timeout):
return wrapClientTLS(
- tls_client_options_factory.get_options(host),
- HostnameEndpoint(reactor, host, port, timeout=timeout))
+ tls_options,
+ HostnameEndpoint(reactor, host, port, timeout=timeout),
+ )
default_port = 8448
if port is None:
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index b34bb8e31a..14b12cd1c4 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,19 +17,22 @@ import cgi
import logging
import random
import sys
-import urllib
+from io import BytesIO
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from six import PY3, string_types
+from six.moves import urllib
-from canonicaljson import encode_canonical_json, json
+import attr
+import treq
+from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
-from twisted.internet import defer, protocol, reactor
+from twisted.internet import defer, protocol
from twisted.internet.error import DNSLookupError
+from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, HTTPConnectionPool, readBody
+from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -40,14 +43,12 @@ from synapse.api.errors import (
HttpResponseException,
SynapseError,
)
-from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util import logcontext
-from synapse.util.async_helpers import add_timeout_to_deferred
+from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
+from synapse.util.metrics import Measure
logger = logging.getLogger(__name__)
-outbound_logger = logging.getLogger("synapse.http.outbound")
outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
"", ["method"])
@@ -58,20 +59,119 @@ incoming_responses_counter = Counter("synapse_http_matrixfederationclient_respon
MAX_LONG_RETRIES = 10
MAX_SHORT_RETRIES = 3
+if PY3:
+ MAXINT = sys.maxsize
+else:
+ MAXINT = sys.maxint
+
class MatrixFederationEndpointFactory(object):
def __init__(self, hs):
+ self.reactor = hs.get_reactor()
self.tls_client_options_factory = hs.tls_client_options_factory
def endpointForURI(self, uri):
- destination = uri.netloc
+ destination = uri.netloc.decode('ascii')
return matrix_federation_endpoint(
- reactor, destination, timeout=10,
+ self.reactor, destination, timeout=10,
tls_client_options_factory=self.tls_client_options_factory
)
+_next_id = 1
+
+
+@attr.s
+class MatrixFederationRequest(object):
+ method = attr.ib()
+ """HTTP method
+ :type: str
+ """
+
+ path = attr.ib()
+ """HTTP path
+ :type: str
+ """
+
+ destination = attr.ib()
+ """The remote server to send the HTTP request to.
+ :type: str"""
+
+ json = attr.ib(default=None)
+ """JSON to send in the body.
+ :type: dict|None
+ """
+
+ json_callback = attr.ib(default=None)
+ """A callback to generate the JSON.
+ :type: func|None
+ """
+
+ query = attr.ib(default=None)
+ """Query arguments.
+ :type: dict|None
+ """
+
+ txn_id = attr.ib(default=None)
+ """Unique ID for this request (for logging)
+ :type: str|None
+ """
+
+ def __attrs_post_init__(self):
+ global _next_id
+ self.txn_id = "%s-O-%s" % (self.method, _next_id)
+ _next_id = (_next_id + 1) % (MAXINT - 1)
+
+ def get_json(self):
+ if self.json_callback:
+ return self.json_callback()
+ return self.json
+
+
+@defer.inlineCallbacks
+def _handle_json_response(reactor, timeout_sec, request, response):
+ """
+ Reads the JSON body of a response, with a timeout
+
+ Args:
+ reactor (IReactor): twisted reactor, for the timeout
+ timeout_sec (float): number of seconds to wait for response to complete
+ request (MatrixFederationRequest): the request that triggered the response
+ response (IResponse): response to the request
+
+ Returns:
+ dict: parsed JSON response
+ """
+ try:
+ check_content_type_is_json(response.headers)
+
+ d = treq.json_content(response)
+ d = timeout_deferred(
+ d,
+ timeout=timeout_sec,
+ reactor=reactor,
+ )
+
+ body = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
+ raise
+ logger.info(
+ "{%s} [%s] Completed: %d %s",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
+ defer.returnValue(body)
+
+
class MatrixFederationHttpClient(object):
"""HTTP client used to talk to other homeservers over the federation
protocol. Send client certificates and signs requests.
@@ -85,7 +185,9 @@ class MatrixFederationHttpClient(object):
self.hs = hs
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
+ reactor = hs.get_reactor()
pool = HTTPConnectionPool(reactor)
+ pool.retryAutomatically = False
pool.maxPersistentPerHost = 5
pool.cachedConnectionTimeout = 2 * 60
self.agent = Agent.usingEndpointFactory(
@@ -93,28 +195,36 @@ class MatrixFederationHttpClient(object):
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
- self.version_string = hs.version_string
- self._next_id = 1
+ self.version_string = hs.version_string.encode('ascii')
+ self.default_timeout = 60
- def _create_url(self, destination, path_bytes, param_bytes, query_bytes):
- return urlparse.urlunparse(
- ("matrix", destination, path_bytes, param_bytes, query_bytes, "")
- )
+ def schedule(x):
+ reactor.callLater(_EPSILON, x)
+
+ self._cooperator = Cooperator(scheduler=schedule)
@defer.inlineCallbacks
- def _request(self, destination, method, path,
- body_callback, headers_dict={}, param_bytes=b"",
- query_bytes=b"", retry_on_dns_fail=True,
- timeout=None, long_retries=False,
- ignore_backoff=False,
- backoff_on_404=False):
- """ Creates and sends a request to the given server
+ def _send_request(
+ self,
+ request,
+ retry_on_dns_fail=True,
+ timeout=None,
+ long_retries=False,
+ ignore_backoff=False,
+ backoff_on_404=False
+ ):
+ """
+ Sends a request to the given server.
+
Args:
- destination (str): The remote server to send the HTTP request to.
- method (str): HTTP method
- path (str): The HTTP path
+ request (MatrixFederationRequest): details of request to be sent
+
+ timeout (int|None): number of milliseconds to wait for the response headers
+ (including connecting to the server). 60s by default.
+
ignore_backoff (bool): true to ignore the historical backoff data
and try the request anyway.
+
backoff_on_404 (bool): Back off if we get a 404
Returns:
@@ -132,38 +242,39 @@ class MatrixFederationHttpClient(object):
(May also fail with plenty of other Exceptions for things like DNS
failures, connection failures, SSL failures.)
"""
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
+
if (
self.hs.config.federation_domain_whitelist is not None and
- destination not in self.hs.config.federation_domain_whitelist
+ request.destination not in self.hs.config.federation_domain_whitelist
):
- raise FederationDeniedError(destination)
+ raise FederationDeniedError(request.destination)
limiter = yield synapse.util.retryutils.get_retry_limiter(
- destination,
+ request.destination,
self.clock,
self._store,
backoff_on_404=backoff_on_404,
ignore_backoff=ignore_backoff,
)
- destination = destination.encode("ascii")
- path_bytes = path.encode("ascii")
- with limiter:
- headers_dict[b"User-Agent"] = [self.version_string]
- headers_dict[b"Host"] = [destination]
-
- url_bytes = self._create_url(
- destination, path_bytes, param_bytes, query_bytes
- )
-
- txn_id = "%s-O-%s" % (method, self._next_id)
- self._next_id = (self._next_id + 1) % (sys.maxint - 1)
+ method = request.method
+ destination = request.destination
+ path_bytes = request.path.encode("ascii")
+ if request.query:
+ query_bytes = encode_query_args(request.query)
+ else:
+ query_bytes = b""
- outbound_logger.info(
- "{%s} [%s] Sending request: %s %s",
- txn_id, destination, method, url_bytes
- )
+ headers_dict = {
+ "User-Agent": [self.version_string],
+ "Host": [request.destination],
+ }
+ with limiter:
# XXX: Would be much nicer to retry only at the transaction-layer
# (once we have reliable transactions in place)
if long_retries:
@@ -171,88 +282,119 @@ class MatrixFederationHttpClient(object):
else:
retries_left = MAX_SHORT_RETRIES
- http_url_bytes = urlparse.urlunparse(
- ("", "", path_bytes, param_bytes, query_bytes, "")
- )
-
- log_result = None
- try:
- while True:
- producer = None
- if body_callback:
- producer = body_callback(method, http_url_bytes, headers_dict)
-
- try:
- request_deferred = self.agent.request(
- method,
- url_bytes,
- Headers(headers_dict),
- producer
+ url = urllib.parse.urlunparse((
+ b"matrix", destination.encode("ascii"),
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
+
+ http_url = urllib.parse.urlunparse((
+ b"", b"",
+ path_bytes, None, query_bytes, b"",
+ )).decode('ascii')
+
+ while True:
+ try:
+ json = request.get_json()
+ if json:
+ data = encode_canonical_json(json)
+ headers_dict["Content-Type"] = ["application/json"]
+ self.sign_request(
+ destination, method, http_url, headers_dict, json
)
- add_timeout_to_deferred(
- request_deferred,
- timeout / 1000. if timeout else 60,
- self.hs.get_reactor(),
- cancelled_to_request_timed_out_error,
+ else:
+ data = None
+ self.sign_request(destination, method, http_url, headers_dict)
+
+ logger.info(
+ "{%s} [%s] Sending request: %s %s",
+ request.txn_id, destination, method, url
+ )
+
+ if data:
+ producer = FileBodyProducer(
+ BytesIO(data),
+ cooperator=self._cooperator
)
+ else:
+ producer = None
+
+ request_deferred = treq.request(
+ method,
+ url,
+ headers=Headers(headers_dict),
+ data=producer,
+ agent=self.agent,
+ reactor=self.hs.get_reactor(),
+ unbuffered=True
+ )
+
+ request_deferred = timeout_deferred(
+ request_deferred,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+
+ with Measure(self.clock, "outbound_request"):
response = yield make_deferred_yieldable(
request_deferred,
)
- log_result = "%d %s" % (response.code, response.phrase,)
- break
- except Exception as e:
- if not retry_on_dns_fail and isinstance(e, DNSLookupError):
- logger.warn(
- "DNS Lookup failed to %s with %s",
- destination,
- e
- )
- log_result = "DNS Lookup failed to %s with %s" % (
- destination, e
- )
- raise
-
- logger.warn(
- "{%s} Sending request failed to %s: %s %s: %s",
- txn_id,
+ break
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Request failed: %s %s: %s",
+ request.txn_id,
+ destination,
+ method,
+ url,
+ _flatten_response_never_received(e),
+ )
+
+ if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+ raise
+
+ if retries_left and not timeout:
+ if long_retries:
+ delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
+ delay = min(delay, 60)
+ delay *= random.uniform(0.8, 1.4)
+ else:
+ delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
+ delay = min(delay, 2)
+ delay *= random.uniform(0.8, 1.4)
+
+ logger.debug(
+ "{%s} [%s] Waiting %ss before re-sending...",
+ request.txn_id,
destination,
- method,
- url_bytes,
- _flatten_response_never_received(e),
+ delay,
)
- log_result = _flatten_response_never_received(e)
-
- if retries_left and not timeout:
- if long_retries:
- delay = 4 ** (MAX_LONG_RETRIES + 1 - retries_left)
- delay = min(delay, 60)
- delay *= random.uniform(0.8, 1.4)
- else:
- delay = 0.5 * 2 ** (MAX_SHORT_RETRIES - retries_left)
- delay = min(delay, 2)
- delay *= random.uniform(0.8, 1.4)
-
- yield self.clock.sleep(delay)
- retries_left -= 1
- else:
- raise
- finally:
- outbound_logger.info(
- "{%s} [%s] Result: %s",
- txn_id,
- destination,
- log_result,
- )
+ yield self.clock.sleep(delay)
+ retries_left -= 1
+ else:
+ raise
+
+ logger.info(
+ "{%s} [%s] Got response headers: %d %s",
+ request.txn_id,
+ destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ )
if 200 <= response.code < 300:
pass
else:
# :'(
# Update transactions table?
- with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ d = treq.content(response)
+ d = timeout_deferred(
+ d,
+ timeout=_sec_timeout,
+ reactor=self.hs.get_reactor(),
+ )
+ body = yield make_deferred_yieldable(d)
raise HttpResponseException(
response.code, response.phrase, body
)
@@ -297,11 +439,11 @@ class MatrixFederationHttpClient(object):
auth_headers = []
for key, sig in request["signatures"][self.server_name].items():
- auth_headers.append(bytes(
+ auth_headers.append((
"X-Matrix origin=%s,key=\"%s\",sig=\"%s\"" % (
self.server_name, key, sig,
- )
- ))
+ )).encode('ascii')
+ )
headers_dict[b"Authorization"] = auth_headers
@@ -346,38 +488,27 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
- if not json_data_callback:
- def json_data_callback():
- return data
+ request = MatrixFederationRequest(
+ method="PUT",
+ destination=destination,
+ path=path,
+ query=args,
+ json_callback=json_data_callback,
+ json=data,
+ )
- def body_callback(method, url_bytes, headers_dict):
- json_data = json_data_callback()
- self.sign_request(
- destination, method, url_bytes, headers_dict, json_data
- )
- producer = _JsonProducer(json_data)
- return producer
-
- response = yield self._request(
- destination,
- "PUT",
- path,
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
- query_bytes=encode_query_args(args),
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
backoff_on_404=backoff_on_404,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
- defer.returnValue(json.loads(body))
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
+ defer.returnValue(body)
@defer.inlineCallbacks
def post_json(self, destination, path, data={}, long_retries=False,
@@ -411,32 +542,30 @@ class MatrixFederationHttpClient(object):
is not on our federation whitelist
"""
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(
- destination, method, url_bytes, headers_dict, data
- )
- return _JsonProducer(data)
-
- response = yield self._request(
- destination,
- "POST",
- path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
- headers_dict={"Content-Type": ["application/json"]},
+ request = MatrixFederationRequest(
+ method="POST",
+ destination=destination,
+ path=path,
+ query=args,
+ json=data,
+ )
+
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
+ if timeout:
+ _sec_timeout = timeout / 1000
+ else:
+ _sec_timeout = self.default_timeout
- defer.returnValue(json.loads(body))
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), _sec_timeout, request, response,
+ )
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_json(self, destination, path, args=None, retry_on_dns_fail=True,
@@ -471,29 +600,24 @@ class MatrixFederationHttpClient(object):
logger.debug("Query bytes: %s Retry DNS: %s", args, retry_on_dns_fail)
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
+ query=args,
+ )
- response = yield self._request(
- destination,
- "GET",
- path,
- query_bytes=encode_query_args(args),
- body_callback=body_callback,
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
-
- defer.returnValue(json.loads(body))
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
+ defer.returnValue(body)
@defer.inlineCallbacks
def delete_json(self, destination, path, long_retries=False,
@@ -523,26 +647,24 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
+ request = MatrixFederationRequest(
+ method="DELETE",
+ destination=destination,
+ path=path,
+ query=args,
+ )
- response = yield self._request(
- destination,
- "DELETE",
- path,
- query_bytes=encode_query_args(args),
- headers_dict={"Content-Type": ["application/json"]},
+ response = yield self._send_request(
+ request,
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
)
- if 200 <= response.code < 300:
- # We need to update the transactions table to say it was sent?
- check_content_type_is_json(response.headers)
-
- with logcontext.PreserveLoggingContext():
- body = yield readBody(response)
-
- defer.returnValue(json.loads(body))
+ body = yield _handle_json_response(
+ self.hs.get_reactor(), self.default_timeout, request, response,
+ )
+ defer.returnValue(body)
@defer.inlineCallbacks
def get_file(self, destination, path, output_stream, args={},
@@ -569,26 +691,15 @@ class MatrixFederationHttpClient(object):
Fails with ``FederationDeniedError`` if this destination
is not on our federation whitelist
"""
+ request = MatrixFederationRequest(
+ method="GET",
+ destination=destination,
+ path=path,
+ query=args,
+ )
- encoded_args = {}
- for k, vs in args.items():
- if isinstance(vs, string_types):
- vs = [vs]
- encoded_args[k] = [v.encode("UTF-8") for v in vs]
-
- query_bytes = urllib.urlencode(encoded_args, True)
- logger.debug("Query bytes: %s Retry DNS: %s", query_bytes, retry_on_dns_fail)
-
- def body_callback(method, url_bytes, headers_dict):
- self.sign_request(destination, method, url_bytes, headers_dict)
- return None
-
- response = yield self._request(
- destination,
- "GET",
- path,
- query_bytes=query_bytes,
- body_callback=body_callback,
+ response = yield self._send_request(
+ request,
retry_on_dns_fail=retry_on_dns_fail,
ignore_backoff=ignore_backoff,
)
@@ -596,14 +707,25 @@ class MatrixFederationHttpClient(object):
headers = dict(response.headers.getAllRawHeaders())
try:
- with logcontext.PreserveLoggingContext():
- length = yield _readBodyToFile(
- response, output_stream, max_size
- )
- except Exception:
- logger.exception("Failed to download body")
+ d = _readBodyToFile(response, output_stream, max_size)
+ d.addTimeout(self.default_timeout, self.hs.get_reactor())
+ length = yield make_deferred_yieldable(d)
+ except Exception as e:
+ logger.warn(
+ "{%s} [%s] Error reading response: %s",
+ request.txn_id,
+ request.destination,
+ e,
+ )
raise
-
+ logger.info(
+ "{%s} [%s] Completed: %d %s [%d bytes]",
+ request.txn_id,
+ request.destination,
+ response.code,
+ response.phrase.decode('ascii', errors='replace'),
+ length,
+ )
defer.returnValue((length, headers))
@@ -639,30 +761,6 @@ def _readBodyToFile(response, stream, max_size):
return d
-class _JsonProducer(object):
- """ Used by the twisted http client to create the HTTP body from json
- """
- def __init__(self, jsn):
- self.reset(jsn)
-
- def reset(self, jsn):
- self.body = encode_canonical_json(jsn)
- self.length = len(self.body)
-
- def startProducing(self, consumer):
- consumer.write(self.body)
- return defer.succeed(None)
-
- def pauseProducing(self):
- pass
-
- def stopProducing(self):
- pass
-
- def resumeProducing(self):
- pass
-
-
def _flatten_response_never_received(e):
if hasattr(e, "reasons"):
reasons = ", ".join(
@@ -693,7 +791,7 @@ def check_content_type_is_json(headers):
"No Content-Type header"
)
- c_type = c_type[0] # only the first header
+ c_type = c_type[0].decode('ascii') # only the first header
val, options = cgi.parse_header(c_type)
if val != "application/json":
raise RuntimeError(
@@ -711,6 +809,6 @@ def encode_query_args(args):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
- query_bytes = urllib.urlencode(encoded_args, True)
+ query_bytes = urllib.parse.urlencode(encoded_args, True)
- return query_bytes
+ return query_bytes.encode('utf8')
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index 72c2654678..fedb4e6b18 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -162,7 +162,7 @@ class RequestMetrics(object):
with _in_flight_requests_lock:
_in_flight_requests.add(self)
- def stop(self, time_sec, request):
+ def stop(self, time_sec, response_code, sent_bytes):
with _in_flight_requests_lock:
_in_flight_requests.discard(self)
@@ -179,35 +179,35 @@ class RequestMetrics(object):
)
return
- response_code = str(request.code)
+ response_code = str(response_code)
- outgoing_responses_counter.labels(request.method, response_code).inc()
+ outgoing_responses_counter.labels(self.method, response_code).inc()
- response_count.labels(request.method, self.name, tag).inc()
+ response_count.labels(self.method, self.name, tag).inc()
- response_timer.labels(request.method, self.name, tag, response_code).observe(
+ response_timer.labels(self.method, self.name, tag, response_code).observe(
time_sec - self.start
)
resource_usage = context.get_resource_usage()
- response_ru_utime.labels(request.method, self.name, tag).inc(
+ response_ru_utime.labels(self.method, self.name, tag).inc(
resource_usage.ru_utime,
)
- response_ru_stime.labels(request.method, self.name, tag).inc(
+ response_ru_stime.labels(self.method, self.name, tag).inc(
resource_usage.ru_stime,
)
- response_db_txn_count.labels(request.method, self.name, tag).inc(
+ response_db_txn_count.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_count
)
- response_db_txn_duration.labels(request.method, self.name, tag).inc(
+ response_db_txn_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_txn_duration_sec
)
- response_db_sched_duration.labels(request.method, self.name, tag).inc(
+ response_db_sched_duration.labels(self.method, self.name, tag).inc(
resource_usage.db_sched_duration_sec
)
- response_size.labels(request.method, self.name, tag).inc(request.sentLength)
+ response_size.labels(self.method, self.name, tag).inc(sent_bytes)
# We always call this at the end to ensure that we update the metrics
# regardless of whether a call to /metrics while the request was in
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2d5c23e673..b4b25cab19 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -84,10 +84,21 @@ def wrap_json_request_handler(h):
logger.info(
"%s SynapseError: %s - %s", request, code, e.msg
)
- respond_with_json(
- request, code, e.error_dict(), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request, code, e.error_dict(), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
except Exception:
# failure.Failure() fishes the original Failure out
@@ -100,16 +111,26 @@ def wrap_json_request_handler(h):
request,
f.getTraceback().rstrip(),
)
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- )
+ # Only respond with an error response if we haven't already started
+ # writing, otherwise lets just kill the connection
+ if request.startedWriting:
+ if request.transport:
+ try:
+ request.transport.abortConnection()
+ except Exception:
+ # abortConnection throws if the connection is already closed
+ pass
+ else:
+ respond_with_json(
+ request,
+ 500,
+ {
+ "error": "Internal server error",
+ "errcode": Codes.UNKNOWN,
+ },
+ send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
return wrap_async_request_handler(wrapped_request_handler)
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 88ed3714f9..e508c0bd4f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -75,17 +75,35 @@ class SynapseRequest(Request):
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
- self.method,
+ self.get_method(),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
self.site.site_tag,
)
def get_request_id(self):
- return "%s-%i" % (self.method, self.request_seq)
+ return "%s-%i" % (self.get_method(), self.request_seq)
def get_redacted_uri(self):
- return redact_uri(self.uri)
+ uri = self.uri
+ if isinstance(uri, bytes):
+ uri = self.uri.decode('ascii')
+ return redact_uri(uri)
+
+ def get_method(self):
+ """Gets the method associated with the request (or placeholder if not
+ method has yet been received).
+
+ Note: This is necessary as the placeholder value in twisted is str
+ rather than bytes, so we need to sanitise `self.method`.
+
+ Returns:
+ str
+ """
+ method = self.method
+ if isinstance(method, bytes):
+ method = self.method.decode('ascii')
+ return method
def get_user_agent(self):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
@@ -116,7 +134,7 @@ class SynapseRequest(Request):
# dispatching to the handler, so that the handler
# can update the servlet name in the request
# metrics
- requests_counter.labels(self.method,
+ requests_counter.labels(self.get_method(),
self.request_metrics.name).inc()
@contextlib.contextmanager
@@ -204,14 +222,14 @@ class SynapseRequest(Request):
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
- self.start_time, name=servlet_name, method=self.method,
+ self.start_time, name=servlet_name, method=self.get_method(),
)
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
self.site.site_tag,
- self.method,
+ self.get_method(),
self.get_redacted_uri()
)
@@ -277,15 +295,15 @@ class SynapseRequest(Request):
int(usage.db_txn_count),
self.sentLength,
code,
- self.method,
+ self.get_method(),
self.get_redacted_uri(),
- self.clientproto,
+ self.clientproto.decode('ascii', errors='replace'),
user_agent,
usage.evt_db_fetch_count,
)
try:
- self.request_metrics.stop(self.finish_time, self)
+ self.request_metrics.stop(self.finish_time, self.code, self.sentLength)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
@@ -305,7 +323,7 @@ class XForwardedForRequest(SynapseRequest):
C{b"-"}.
"""
return self.requestHeaders.getRawHeaders(
- b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip()
+ b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip().decode('ascii')
class SynapseRequestFactory(object):
|