diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e32e68e8c4..5b5ee2c13e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -30,8 +30,10 @@ from typing import (
from prometheus_client import Counter
+from twisted.internet.protocol import ReconnectingClientFactory
+
from synapse.metrics import LaterGauge
-from synapse.replication.tcp.client import ReplicationClientFactory
+from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
from synapse.replication.tcp.commands import (
ClearUserSyncsCommand,
Command,
@@ -92,7 +94,7 @@ class ReplicationCommandHandler:
self._pending_batches = {} # type: Dict[str, List[Any]]
# The factory used to create connections.
- self._factory = None # type: Optional[ReplicationClientFactory]
+ self._factory = None # type: Optional[ReconnectingClientFactory]
# The currently connected connections.
self._connections = [] # type: List[AbstractConnection]
@@ -119,11 +121,45 @@ class ReplicationCommandHandler:
"""Helper method to start a replication connection to the remote server
using TCP.
"""
- client_name = hs.config.worker_name
- self._factory = ReplicationClientFactory(hs, client_name, self)
- host = hs.config.worker_replication_host
- port = hs.config.worker_replication_port
- hs.get_reactor().connectTCP(host, port, self._factory)
+ if hs.config.redis.redis_enabled:
+ from synapse.replication.tcp.redis import (
+ RedisDirectTcpReplicationClientFactory,
+ )
+ import txredisapi
+
+ logger.info(
+ "Connecting to redis (host=%r port=%r DBID=%r)",
+ hs.config.redis_host,
+ hs.config.redis_port,
+ hs.config.redis_dbid,
+ )
+
+ # We need two connections to redis, one for the subscription stream and
+ # one to send commands to (as you can't send further redis commands to a
+ # connection after SUBSCRIBE is called).
+
+ # First create the connection for sending commands.
+ outbound_redis_connection = txredisapi.lazyConnection(
+ host=hs.config.redis_host,
+ port=hs.config.redis_port,
+ dbid=hs.config.redis_dbid,
+ password=hs.config.redis.redis_password,
+ reconnect=True,
+ )
+
+ # Now create the factory/connection for the subscription stream.
+ self._factory = RedisDirectTcpReplicationClientFactory(
+ hs, outbound_redis_connection
+ )
+ hs.get_reactor().connectTCP(
+ hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
+ )
+ else:
+ client_name = hs.config.worker_name
+ self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
+ host = hs.config.worker_replication_host
+ port = hs.config.worker_replication_port
+ hs.get_reactor().connectTCP(host, port, self._factory)
async def on_REPLICATE(self, cmd: ReplicateCommand):
# We only want to announce positions by the writer of the streams.
|