summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py55
1 files changed, 25 insertions, 30 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 250bb1ef91..5ee4d528d2 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -28,11 +28,11 @@ from canonicaljson import encode_canonical_json
 from prometheus_client import Counter
 from signedjson.sign import sign_json
 
-from twisted.internet import defer, protocol
+from twisted.internet import defer, protocol, task
 from twisted.internet.error import DNSLookupError
 from twisted.internet.task import _EPSILON, Cooperator
 from twisted.web._newclient import ResponseDone
-from twisted.web.client import Agent, FileBodyProducer, HTTPConnectionPool
+from twisted.web.client import FileBodyProducer
 from twisted.web.http_headers import Headers
 
 import synapse.metrics
@@ -44,7 +44,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
-from synapse.http.endpoint import matrix_federation_endpoint
+from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent
 from synapse.util.async_helpers import timeout_deferred
 from synapse.util.logcontext import make_deferred_yieldable
 from synapse.util.metrics import Measure
@@ -66,20 +66,6 @@ else:
     MAXINT = sys.maxint
 
 
-class MatrixFederationEndpointFactory(object):
-    def __init__(self, hs):
-        self.reactor = hs.get_reactor()
-        self.tls_client_options_factory = hs.tls_client_options_factory
-
-    def endpointForURI(self, uri):
-        destination = uri.netloc.decode('ascii')
-
-        return matrix_federation_endpoint(
-            self.reactor, destination, timeout=10,
-            tls_client_options_factory=self.tls_client_options_factory
-        )
-
-
 _next_id = 1
 
 
@@ -187,12 +173,10 @@ class MatrixFederationHttpClient(object):
         self.signing_key = hs.config.signing_key[0]
         self.server_name = hs.hostname
         reactor = hs.get_reactor()
-        pool = HTTPConnectionPool(reactor)
-        pool.retryAutomatically = False
-        pool.maxPersistentPerHost = 5
-        pool.cachedConnectionTimeout = 2 * 60
-        self.agent = Agent.usingEndpointFactory(
-            reactor, MatrixFederationEndpointFactory(hs), pool=pool
+
+        self.agent = MatrixFederationAgent(
+            hs.get_reactor(),
+            hs.tls_client_options_factory,
         )
         self.clock = hs.get_clock()
         self._store = hs.get_datastore()
@@ -271,7 +255,6 @@ class MatrixFederationHttpClient(object):
 
         headers_dict = {
             b"User-Agent": [self.version_string_bytes],
-            b"Host": [destination_bytes],
         }
 
         with limiter:
@@ -303,7 +286,7 @@ class MatrixFederationHttpClient(object):
                             json,
                         )
                         data = encode_canonical_json(json)
-                        producer = FileBodyProducer(
+                        producer = QuieterFileBodyProducer(
                             BytesIO(data),
                             cooperator=self._cooperator,
                         )
@@ -316,9 +299,9 @@ class MatrixFederationHttpClient(object):
                     headers_dict[b"Authorization"] = auth_headers
 
                     logger.info(
-                        "{%s} [%s] Sending request: %s %s",
+                        "{%s} [%s] Sending request: %s %s; timeout %fs",
                         request.txn_id, request.destination, request.method,
-                        url_str,
+                        url_str, _sec_timeout,
                     )
 
                     try:
@@ -338,12 +321,11 @@ class MatrixFederationHttpClient(object):
                                 reactor=self.hs.get_reactor(),
                             )
 
-                            response = yield make_deferred_yieldable(
-                                request_deferred,
-                            )
+                            response = yield request_deferred
                     except DNSLookupError as e:
                         raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
                     except Exception as e:
+                        logger.info("Failed to send request: %s", e)
                         raise_from(RequestSendFailed(e, can_retry=True), e)
 
                     logger.info(
@@ -857,3 +839,16 @@ def encode_query_args(args):
     query_bytes = urllib.parse.urlencode(encoded_args, True)
 
     return query_bytes.encode('utf8')
+
+
+class QuieterFileBodyProducer(FileBodyProducer):
+    """Wrapper for FileBodyProducer that avoids CRITICAL errors when the connection drops.
+
+    Workaround for https://github.com/matrix-org/synapse/issues/4003 /
+    https://twistedmatrix.com/trac/ticket/6528
+    """
+    def stopProducing(self):
+        try:
+            FileBodyProducer.stopProducing(self)
+        except task.TaskStopped:
+            pass