summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py50
1 files changed, 43 insertions, 7 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index e32e68e8c4..5b5ee2c13e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -30,8 +30,10 @@ from typing import (
 
 from prometheus_client import Counter
 
+from twisted.internet.protocol import ReconnectingClientFactory
+
 from synapse.metrics import LaterGauge
-from synapse.replication.tcp.client import ReplicationClientFactory
+from synapse.replication.tcp.client import DirectTcpReplicationClientFactory
 from synapse.replication.tcp.commands import (
     ClearUserSyncsCommand,
     Command,
@@ -92,7 +94,7 @@ class ReplicationCommandHandler:
         self._pending_batches = {}  # type: Dict[str, List[Any]]
 
         # The factory used to create connections.
-        self._factory = None  # type: Optional[ReplicationClientFactory]
+        self._factory = None  # type: Optional[ReconnectingClientFactory]
 
         # The currently connected connections.
         self._connections = []  # type: List[AbstractConnection]
@@ -119,11 +121,45 @@ class ReplicationCommandHandler:
         """Helper method to start a replication connection to the remote server
         using TCP.
         """
-        client_name = hs.config.worker_name
-        self._factory = ReplicationClientFactory(hs, client_name, self)
-        host = hs.config.worker_replication_host
-        port = hs.config.worker_replication_port
-        hs.get_reactor().connectTCP(host, port, self._factory)
+        if hs.config.redis.redis_enabled:
+            from synapse.replication.tcp.redis import (
+                RedisDirectTcpReplicationClientFactory,
+            )
+            import txredisapi
+
+            logger.info(
+                "Connecting to redis (host=%r port=%r DBID=%r)",
+                hs.config.redis_host,
+                hs.config.redis_port,
+                hs.config.redis_dbid,
+            )
+
+            # We need two connections to redis, one for the subscription stream and
+            # one to send commands to (as you can't send further redis commands to a
+            # connection after SUBSCRIBE is called).
+
+            # First create the connection for sending commands.
+            outbound_redis_connection = txredisapi.lazyConnection(
+                host=hs.config.redis_host,
+                port=hs.config.redis_port,
+                dbid=hs.config.redis_dbid,
+                password=hs.config.redis.redis_password,
+                reconnect=True,
+            )
+
+            # Now create the factory/connection for the subscription stream.
+            self._factory = RedisDirectTcpReplicationClientFactory(
+                hs, outbound_redis_connection
+            )
+            hs.get_reactor().connectTCP(
+                hs.config.redis.redis_host, hs.config.redis.redis_port, self._factory,
+            )
+        else:
+            client_name = hs.config.worker_name
+            self._factory = DirectTcpReplicationClientFactory(hs, client_name, self)
+            host = hs.config.worker_replication_host
+            port = hs.config.worker_replication_port
+            hs.get_reactor().connectTCP(host, port, self._factory)
 
     async def on_REPLICATE(self, cmd: ReplicateCommand):
         # We only want to announce positions by the writer of the streams.