summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/4358.misc1
-rw-r--r--synapse/api/errors.py18
-rw-r--r--synapse/federation/transaction_queue.py19
-rw-r--r--synapse/http/matrixfederationclient.py105
-rw-r--r--synapse/rest/media/v1/media_repository.py7
-rw-r--r--tests/http/test_fedclient.py13
6 files changed, 119 insertions, 44 deletions
diff --git a/changelog.d/4358.misc b/changelog.d/4358.misc
new file mode 100644
index 0000000000..020dacb547
--- /dev/null
+++ b/changelog.d/4358.misc
@@ -0,0 +1 @@
+Add better logging for unexpected errors while sending transactions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 48b903374d..0b464834ce 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -348,6 +348,24 @@ class IncompatibleRoomVersionError(SynapseError):
         )
 
 
+class RequestSendFailed(RuntimeError):
+    """Sending a HTTP request over federation failed due to not being able to
+    talk to the remote server for some reason.
+
+    This exception is used to differentiate "expected" errors that arise due to
+    networking (e.g. DNS failures, connection timeouts etc), versus unexpected
+    errors (like programming errors).
+    """
+    def __init__(self, inner_exception, can_retry):
+        super(RequestSendFailed, self).__init__(
+            "Failed to send request: %s: %s" % (
+                type(inner_exception).__name__, inner_exception,
+            )
+        )
+        self.inner_exception = inner_exception
+        self.can_retry = can_retry
+
+
 def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
     """ Utility method for constructing an error response for client-server
     interactions.
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 099ace28c1..4640513497 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -22,7 +22,11 @@ from prometheus_client import Counter
 from twisted.internet import defer
 
 import synapse.metrics
-from synapse.api.errors import FederationDeniedError, HttpResponseException
+from synapse.api.errors import (
+    FederationDeniedError,
+    HttpResponseException,
+    RequestSendFailed,
+)
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 from synapse.metrics import (
     LaterGauge,
@@ -518,11 +522,16 @@ class TransactionQueue(object):
             )
         except FederationDeniedError as e:
             logger.info(e)
-        except Exception as e:
-            logger.warn(
-                "TX [%s] Failed to send transaction: %s",
+        except RequestSendFailed as e:
+            logger.warning("(TX [%s] Failed to send transaction: %s", destination, e)
+
+            for p, _ in pending_pdus:
+                logger.info("Failed to send event %s to %s", p.event_id,
+                            destination)
+        except Exception:
+            logger.exception(
+                "TX [%s] Failed to send transaction",
                 destination,
-                e,
             )
             for p, _ in pending_pdus:
                 logger.info("Failed to send event %s to %s", p.event_id,
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 24b6110c20..7a2b4f0957 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -19,7 +19,7 @@ import random
 import sys
 from io import BytesIO
 
-from six import PY3, string_types
+from six import PY3, raise_from, string_types
 from six.moves import urllib
 
 import attr
@@ -41,6 +41,7 @@ from synapse.api.errors import (
     Codes,
     FederationDeniedError,
     HttpResponseException,
+    RequestSendFailed,
     SynapseError,
 )
 from synapse.http.endpoint import matrix_federation_endpoint
@@ -231,7 +232,7 @@ class MatrixFederationHttpClient(object):
             Deferred: resolves with the http response object on success.
 
             Fails with ``HttpResponseException``: if we get an HTTP response
-                code >= 300.
+                code >= 300 (except 429).
 
             Fails with ``NotRetryingDestination`` if we are not yet ready
                 to retry this server.
@@ -239,8 +240,8 @@ class MatrixFederationHttpClient(object):
             Fails with ``FederationDeniedError`` if this destination
                 is not on our federation whitelist
 
-            (May also fail with plenty of other Exceptions for things like DNS
-                failures, connection failures, SSL failures.)
+            Fails with ``RequestSendFailed`` if there were problems connecting to
+                the remote, due to e.g. DNS failures, connection timeouts etc.
         """
         if timeout:
             _sec_timeout = timeout / 1000
@@ -335,23 +336,74 @@ class MatrixFederationHttpClient(object):
                         reactor=self.hs.get_reactor(),
                     )
 
-                    with Measure(self.clock, "outbound_request"):
-                        response = yield make_deferred_yieldable(
-                            request_deferred,
+                    try:
+                        with Measure(self.clock, "outbound_request"):
+                            response = yield make_deferred_yieldable(
+                                request_deferred,
+                            )
+                    except DNSLookupError as e:
+                        raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+                    except Exception as e:
+                        raise_from(RequestSendFailed(e, can_retry=True), e)
+
+                    logger.info(
+                        "{%s} [%s] Got response headers: %d %s",
+                        request.txn_id,
+                        request.destination,
+                        response.code,
+                        response.phrase.decode('ascii', errors='replace'),
+                    )
+
+                    if 200 <= response.code < 300:
+                        pass
+                    else:
+                        # :'(
+                        # Update transactions table?
+                        d = treq.content(response)
+                        d = timeout_deferred(
+                            d,
+                            timeout=_sec_timeout,
+                            reactor=self.hs.get_reactor(),
                         )
 
+                        try:
+                            body = yield make_deferred_yieldable(d)
+                        except Exception as e:
+                            # Eh, we're already going to raise an exception so lets
+                            # ignore if this fails.
+                            logger.warn(
+                                "{%s} [%s] Failed to get error response: %s %s: %s",
+                                request.txn_id,
+                                request.destination,
+                                request.method,
+                                url_str,
+                                _flatten_response_never_received(e),
+                            )
+                            body = None
+
+                        e = HttpResponseException(
+                            response.code, response.phrase, body
+                        )
+
+                        # Retry if the error is a 429 (Too Many Requests),
+                        # otherwise just raise a standard HttpResponseException
+                        if response.code == 429:
+                            raise_from(RequestSendFailed(e, can_retry=True), e)
+                        else:
+                            raise e
+
                     break
-                except Exception as e:
+                except RequestSendFailed as e:
                     logger.warn(
                         "{%s} [%s] Request failed: %s %s: %s",
                         request.txn_id,
                         request.destination,
                         request.method,
                         url_str,
-                        _flatten_response_never_received(e),
+                        _flatten_response_never_received(e.inner_exception),
                     )
 
-                    if not retry_on_dns_fail and isinstance(e, DNSLookupError):
+                    if not e.can_retry:
                         raise
 
                     if retries_left and not timeout:
@@ -376,29 +428,16 @@ class MatrixFederationHttpClient(object):
                     else:
                         raise
 
-            logger.info(
-                "{%s} [%s] Got response headers: %d %s",
-                request.txn_id,
-                request.destination,
-                response.code,
-                response.phrase.decode('ascii', errors='replace'),
-            )
-
-            if 200 <= response.code < 300:
-                pass
-            else:
-                # :'(
-                # Update transactions table?
-                d = treq.content(response)
-                d = timeout_deferred(
-                    d,
-                    timeout=_sec_timeout,
-                    reactor=self.hs.get_reactor(),
-                )
-                body = yield make_deferred_yieldable(d)
-                raise HttpResponseException(
-                    response.code, response.phrase, body
-                )
+                except Exception as e:
+                    logger.warn(
+                        "{%s} [%s] Request failed: %s %s: %s",
+                        request.txn_id,
+                        request.destination,
+                        request.method,
+                        url_str,
+                        _flatten_response_never_received(e),
+                    )
+                    raise
 
             defer.returnValue(response)
 
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index e117836e9a..bdffa97805 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -30,6 +30,7 @@ from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
     NotFoundError,
+    RequestSendFailed,
     SynapseError,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -372,10 +373,10 @@ class MediaRepository(object):
                         "allow_remote": "false",
                     }
                 )
-            except twisted.internet.error.DNSLookupError as e:
-                logger.warn("HTTP error fetching remote media %s/%s: %r",
+            except RequestSendFailed as e:
+                logger.warn("Request failed fetching remote media %s/%s: %r",
                             server_name, media_id, e)
-                raise NotFoundError()
+                raise SynapseError(502, "Failed to fetch remote media")
 
             except HttpResponseException as e:
                 logger.warn("HTTP error fetching remote media %s/%s: %s",
diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py
index f3cb1423f0..b2e38276d8 100644
--- a/tests/http/test_fedclient.py
+++ b/tests/http/test_fedclient.py
@@ -20,6 +20,7 @@ from twisted.internet.error import ConnectingCancelledError, DNSLookupError
 from twisted.web.client import ResponseNeverReceived
 from twisted.web.http import HTTPChannel
 
+from synapse.api.errors import RequestSendFailed
 from synapse.http.matrixfederationclient import (
     MatrixFederationHttpClient,
     MatrixFederationRequest,
@@ -49,7 +50,8 @@ class FederationClientTests(HomeserverTestCase):
         self.pump()
 
         f = self.failureResultOf(d)
-        self.assertIsInstance(f.value, DNSLookupError)
+        self.assertIsInstance(f.value, RequestSendFailed)
+        self.assertIsInstance(f.value.inner_exception, DNSLookupError)
 
     def test_client_never_connect(self):
         """
@@ -76,7 +78,11 @@ class FederationClientTests(HomeserverTestCase):
         self.reactor.advance(10.5)
         f = self.failureResultOf(d)
 
-        self.assertIsInstance(f.value, (ConnectingCancelledError, TimeoutError))
+        self.assertIsInstance(f.value, RequestSendFailed)
+        self.assertIsInstance(
+            f.value.inner_exception,
+            (ConnectingCancelledError, TimeoutError),
+        )
 
     def test_client_connect_no_response(self):
         """
@@ -107,7 +113,8 @@ class FederationClientTests(HomeserverTestCase):
         self.reactor.advance(10.5)
         f = self.failureResultOf(d)
 
-        self.assertIsInstance(f.value, ResponseNeverReceived)
+        self.assertIsInstance(f.value, RequestSendFailed)
+        self.assertIsInstance(f.value.inner_exception, ResponseNeverReceived)
 
     def test_client_gets_headers(self):
         """