diff --git a/changelog.d/7401.feature b/changelog.d/7401.feature
new file mode 100644
index 0000000000..ce6140fdd1
--- /dev/null
+++ b/changelog.d/7401.feature
@@ -0,0 +1 @@
+Add support for running replication over Redis when using workers.
diff --git a/stubs/txredisapi.pyi b/stubs/txredisapi.pyi
index 763d3fb404..cac689d4f3 100644
--- a/stubs/txredisapi.pyi
+++ b/stubs/txredisapi.pyi
@@ -22,7 +22,10 @@ class RedisProtocol:
def publish(self, channel: str, message: bytes): ...
class SubscriberProtocol:
+ password: Optional[str]
def subscribe(self, channels: Union[str, List[str]]): ...
+ def connectionMade(self): ...
+ def connectionLost(self, reason): ...
def lazyConnection(
host: str = ...,
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 617e860f95..41c623d737 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -61,6 +61,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
outbound_redis_connection = None # type: txredisapi.RedisProtocol
def connectionMade(self):
+ super().connectionMade()
logger.info("Connected to redis instance")
self.subscribe(self.stream_name)
self.send_command(ReplicateCommand())
@@ -119,6 +120,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
logger.warning("Unhandled command: %r", cmd)
def connectionLost(self, reason):
+ super().connectionLost(reason)
logger.info("Lost connection to redis instance")
self.handler.lost_connection(self)
@@ -189,5 +191,6 @@ class RedisDirectTcpReplicationClientFactory(txredisapi.SubscriberFactory):
p.handler = self.handler
p.outbound_redis_connection = self.outbound_redis_connection
p.stream_name = self.stream_name
+ p.password = self.password
return p
|