summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/replication/http/_base.py47
1 files changed, 36 insertions, 11 deletions
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e697c74a6..f1abb98653 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
 
 from prometheus_client import Counter, Gauge
 
+from twisted.internet.error import ConnectError, DNSLookupError
 from twisted.web.server import Request
 
 from synapse.api.errors import HttpResponseException, SynapseError
@@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
             `_handle_request` must return a Deferred.
         RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
             is received.
+        RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
+            a connection error is received.
+        RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
+            receiving connection errors, each will backoff exponentially longer.
     """
 
     NAME: str = abc.abstractproperty()  # type: ignore
@@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
     METHOD = "POST"
     CACHE = True
     RETRY_ON_TIMEOUT = True
+    RETRY_ON_CONNECT_ERROR = True
+    RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5  # =63s (2^6-1)
 
     def __init__(self, hs: "HomeServer"):
         if self.CACHE:
@@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                     "/".join(url_args),
                 )
 
+                headers: Dict[bytes, List[bytes]] = {}
+                # Add an authorization header, if configured.
+                if replication_secret:
+                    headers[b"Authorization"] = [b"Bearer " + replication_secret]
+                opentracing.inject_header_dict(headers, check_destination=False)
+
                 try:
+                    # Keep track of attempts made so we can bail if we don't manage to
+                    # connect to the target after N tries.
+                    attempts = 0
                     # We keep retrying the same request for timeouts. This is so that we
                     # have a good idea that the request has either succeeded or failed
                     # on the master, and so whether we should clean up or not.
                     while True:
-                        headers: Dict[bytes, List[bytes]] = {}
-                        # Add an authorization header, if configured.
-                        if replication_secret:
-                            headers[b"Authorization"] = [
-                                b"Bearer " + replication_secret
-                            ]
-                        opentracing.inject_header_dict(headers, check_destination=False)
                         try:
                             result = await request_func(uri, data, headers=headers)
                             break
@@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
                             if not cls.RETRY_ON_TIMEOUT:
                                 raise
 
-                        logger.warning("%s request timed out; retrying", cls.NAME)
+                            logger.warning("%s request timed out; retrying", cls.NAME)
+
+                            # If we timed out we probably don't need to worry about backing
+                            # off too much, but lets just wait a little anyway.
+                            await clock.sleep(1)
+                        except (ConnectError, DNSLookupError) as e:
+                            if not cls.RETRY_ON_CONNECT_ERROR:
+                                raise
+                            if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
+                                raise
+
+                            delay = 2 ** attempts
+                            logger.warning(
+                                "%s request connection failed; retrying in %ds: %r",
+                                cls.NAME,
+                                delay,
+                                e,
+                            )
 
-                        # If we timed out we probably don't need to worry about backing
-                        # off too much, but lets just wait a little anyway.
-                        await clock.sleep(1)
+                            await clock.sleep(delay)
+                            attempts += 1
                 except HttpResponseException as e:
                     # We convert to SynapseError as we know that it was a SynapseError
                     # on the main process that we should send to the client. (And