diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index fdd087683b..89f8af0f36 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -15,8 +15,9 @@
import logging
from inspect import isawaitable
-from typing import TYPE_CHECKING, Optional, Type, cast
+from typing import TYPE_CHECKING, Generic, Optional, Type, TypeVar, cast
+import attr
import txredisapi
from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -42,6 +43,24 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+T = TypeVar("T")
+V = TypeVar("V")
+
+
+@attr.s
+class ConstantProperty(Generic[T, V]):
+ """A descriptor that returns the given constant, ignoring attempts to set
+ it.
+ """
+
+ constant = attr.ib() # type: V
+
+ def __get__(self, obj: Optional[T], objtype: Type[T] = None) -> V:
+ return self.constant
+
+ def __set__(self, obj: Optional[T], value: V):
+ pass
+
class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
"""Connection to redis subscribed to replication stream.
@@ -195,6 +214,10 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
we detect dead connections.
"""
+ # We want to *always* retry connecting, txredisapi will stop if there is a
+ # failure during certain operations, e.g. during AUTH.
+ continueTrying = cast(bool, ConstantProperty(True))
+
def __init__(
self,
hs: "HomeServer",
@@ -243,7 +266,6 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
"""
maxDelay = 5
- continueTrying = True
protocol = RedisSubscriber
def __init__(
|