diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 2e697c74a6..f1abb98653 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -21,6 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, List, Tuple
from prometheus_client import Counter, Gauge
+from twisted.internet.error import ConnectError, DNSLookupError
from twisted.web.server import Request
from synapse.api.errors import HttpResponseException, SynapseError
@@ -87,6 +88,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
`_handle_request` must return a Deferred.
RETRY_ON_TIMEOUT(bool): Whether or not to retry the request when a 504
is received.
+ RETRY_ON_CONNECT_ERROR (bool): Whether or not to retry the request when
+ a connection error is received.
+ RETRY_ON_CONNECT_ERROR_ATTEMPTS (int): Number of attempts to retry when
+ receiving connection errors, each will backoff exponentially longer.
"""
NAME: str = abc.abstractproperty() # type: ignore
@@ -94,6 +99,8 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
METHOD = "POST"
CACHE = True
RETRY_ON_TIMEOUT = True
+ RETRY_ON_CONNECT_ERROR = True
+ RETRY_ON_CONNECT_ERROR_ATTEMPTS = 5 # =63s (2^6-1)
def __init__(self, hs: "HomeServer"):
if self.CACHE:
@@ -236,18 +243,20 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
"/".join(url_args),
)
+ headers: Dict[bytes, List[bytes]] = {}
+ # Add an authorization header, if configured.
+ if replication_secret:
+ headers[b"Authorization"] = [b"Bearer " + replication_secret]
+ opentracing.inject_header_dict(headers, check_destination=False)
+
try:
+ # Keep track of attempts made so we can bail if we don't manage to
+ # connect to the target after N tries.
+ attempts = 0
# We keep retrying the same request for timeouts. This is so that we
# have a good idea that the request has either succeeded or failed
# on the master, and so whether we should clean up or not.
while True:
- headers: Dict[bytes, List[bytes]] = {}
- # Add an authorization header, if configured.
- if replication_secret:
- headers[b"Authorization"] = [
- b"Bearer " + replication_secret
- ]
- opentracing.inject_header_dict(headers, check_destination=False)
try:
result = await request_func(uri, data, headers=headers)
break
@@ -255,11 +264,27 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
if not cls.RETRY_ON_TIMEOUT:
raise
- logger.warning("%s request timed out; retrying", cls.NAME)
+ logger.warning("%s request timed out; retrying", cls.NAME)
+
+ # If we timed out we probably don't need to worry about backing
+ # off too much, but lets just wait a little anyway.
+ await clock.sleep(1)
+ except (ConnectError, DNSLookupError) as e:
+ if not cls.RETRY_ON_CONNECT_ERROR:
+ raise
+ if attempts > cls.RETRY_ON_CONNECT_ERROR_ATTEMPTS:
+ raise
+
+ delay = 2 ** attempts
+ logger.warning(
+ "%s request connection failed; retrying in %ds: %r",
+ cls.NAME,
+ delay,
+ e,
+ )
- # If we timed out we probably don't need to worry about backing
- # off too much, but lets just wait a little anyway.
- await clock.sleep(1)
+ await clock.sleep(delay)
+ attempts += 1
except HttpResponseException as e:
# We convert to SynapseError as we know that it was a SynapseError
# on the main process that we should send to the client. (And
|