diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 250bb1ef91..5ee4d528d2 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -28,11 +28,11 @@ from canonicaljson import encode_canonical_json
from prometheus_client import Counter
from signedjson.sign import sign_json
-from twisted.internet import defer, protocol
+from twisted.internet import defer, protocol, task
from twisted.internet.error import DNSLookupError
from twisted.internet.task import _EPSILON, Cooperator
from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
+from twisted.web.client import FileBodyProducer
from twisted.web.http_headers import Headers
import synapse.metrics
@@ -44,7 +44,7 @@ from synapse.api.errors import (
RequestSendFailed,
SynapseError,
)
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
from synapse.util.async_helpers import timeout_deferred
from synapse.util.logcontext import make_deferred_yieldable
from synapse.util.metrics import Measure
@@ -66,20 +66,6 @@ else:
MAXINT = sys.maxint
-class MatrixFederationEndpointFactory(object):
- def __init__(self, hs):
- self.reactor = hs.get_reactor()
- self.tls_client_options_factory = hs.tls_client_options_factory
-
- def endpointForURI(self, uri):
- destination = uri.netloc.decode('ascii')
-
- return matrix_federation_endpoint(
- self.reactor, destination, timeout=10,
- tls_client_options_factory=self.tls_client_options_factory
- )
-
-
_next_id = 1
@@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object):
self.signing_key = hs.config.signing_key[0]
self.server_name = hs.hostname
reactor = hs.get_reactor()
- pool = HTTPConnectionPool(reactor)
- pool.retryAutomatically = False
- pool.maxPersistentPerHost = 5
- pool.cachedConnectionTimeout = 2 * 60
- self.agent = Agent.usingEndpointFactory(
- reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+ self.agent = MatrixFederationAgent(
+ hs.get_reactor(),
+ hs.tls_client_options_factory,
)
self.clock = hs.get_clock()
self._store = hs.get_datastore()
@@ -271,7 +255,6 @@ class MatrixFederationHttpClient(object):
headers_dict = {
b"User-Agent": [self.version_string_bytes],
- b"Host": [destination_bytes],
}
with limiter:
@@ -303,7 +286,7 @@ class MatrixFederationHttpClient(object):
json,
)
data = encode_canonical_json(json)
- producer = FileBodyProducer(
+ producer = QuieterFileBodyProducer(
BytesIO(data),
cooperator=self._cooperator,
)
@@ -316,9 +299,9 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
logger.info(
- "{%s} [%s] Sending request: %s %s",
+ "{%s} [%s] Sending request: %s %s; timeout %fs",
request.txn_id, request.destination, request.method,
- url_str,
+ url_str, _sec_timeout,
)
try:
@@ -338,12 +321,11 @@ class MatrixFederationHttpClient(object):
reactor=self.hs.get_reactor(),
)
- response = yield make_deferred_yieldable(
- request_deferred,
- )
+ response = yield request_deferred
except DNSLookupError as e:
raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
except Exception as e:
+ logger.info("Failed to send request: %s", e)
raise_from(RequestSendFailed(e, can_retry=True), e)
logger.info(
@@ -857,3 +839,16 @@ def encode_query_args(args):
query_bytes = urllib.parse.urlencode(encoded_args, True)
return query_bytes.encode('utf8')
+
+
+class QuieterFileBodyProducer(FileBodyProducer):
+ """Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops.
+
+ Workaround for https://github.com/matrix-org/synapse/issues/4003 /
+ https://twistedmatrix.com/trac/ticket/6528
+ """
+ def stopProducing(self):
+ try:
+ FileBodyProducer.stopProducing(self)
+ except task.TaskStopped:
+ pass
|