summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/redis.py1
-rw-r--r--synapse/replication/tcp/handler.py10
-rw-r--r--synapse/replication/tcp/redis.py62
-rw-r--r--synapse/server.py42
4 files changed, 92 insertions, 23 deletions
diff --git a/synapse/config/redis.py b/synapse/config/redis.py
index 636cb450b8..3c4c499e22 100644
--- a/synapse/config/redis.py
+++ b/synapse/config/redis.py
@@ -33,6 +33,7 @@ class RedisConfig(Config):
 
         self.redis_host = redis_config.get("host", "localhost")
         self.redis_port = redis_config.get("port", 6379)
+        self.redis_path = redis_config.get("path", None)
         self.redis_dbid = redis_config.get("dbid", None)
         self.redis_password = redis_config.get("password")
 
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 233ad61d49..5d108fe11b 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -352,7 +352,15 @@ class ReplicationCommandHandler:
 
         reactor = hs.get_reactor()
         redis_config = hs.config.redis
-        if hs.config.redis.redis_use_tls:
+        if redis_config.redis_path is not None:
+            reactor.connectUNIX(
+                redis_config.redis_path,
+                self._factory,
+                timeout=30,
+                checkPID=False,
+            )
+
+        elif hs.config.redis.redis_use_tls:
             ssl_context_factory = ClientContextFactory(hs.config.redis)
             reactor.connectSSL(
                 redis_config.redis_host,
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index c8f4bf8b27..7e96145b3b 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -17,7 +17,12 @@ from inspect import isawaitable
 from typing import TYPE_CHECKING, Any, Generic, List, Optional, Type, TypeVar, cast
 
 import attr
-import txredisapi
+from txredisapi import (
+    ConnectionHandler,
+    RedisFactory,
+    SubscriberProtocol,
+    UnixConnectionHandler,
+)
 from zope.interface import implementer
 
 from twisted.internet.address import IPv4Address, IPv6Address
@@ -68,7 +73,7 @@ class ConstantProperty(Generic[T, V]):
 
 
 @implementer(IReplicationConnection)
-class RedisSubscriber(txredisapi.SubscriberProtocol):
+class RedisSubscriber(SubscriberProtocol):
     """Connection to redis subscribed to replication stream.
 
     This class fulfils two functions:
@@ -95,7 +100,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
     synapse_handler: "ReplicationCommandHandler"
     synapse_stream_prefix: str
     synapse_channel_names: List[str]
-    synapse_outbound_redis_connection: txredisapi.ConnectionHandler
+    synapse_outbound_redis_connection: ConnectionHandler
 
     def __init__(self, *args: Any, **kwargs: Any):
         super().__init__(*args, **kwargs)
@@ -229,7 +234,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol):
         )
 
 
-class SynapseRedisFactory(txredisapi.RedisFactory):
+class SynapseRedisFactory(RedisFactory):
     """A subclass of RedisFactory that periodically sends pings to ensure that
     we detect dead connections.
     """
@@ -245,7 +250,7 @@ class SynapseRedisFactory(txredisapi.RedisFactory):
         dbid: Optional[int],
         poolsize: int,
         isLazy: bool = False,
-        handler: Type = txredisapi.ConnectionHandler,
+        handler: Type = ConnectionHandler,
         charset: str = "utf-8",
         password: Optional[str] = None,
         replyTimeout: int = 30,
@@ -326,7 +331,7 @@ class RedisDirectTcpReplicationClientFactory(SynapseRedisFactory):
     def __init__(
         self,
         hs: "HomeServer",
-        outbound_redis_connection: txredisapi.ConnectionHandler,
+        outbound_redis_connection: ConnectionHandler,
         channel_names: List[str],
     ):
         super().__init__(
@@ -368,7 +373,7 @@ def lazyConnection(
     reconnect: bool = True,
     password: Optional[str] = None,
     replyTimeout: int = 30,
-) -> txredisapi.ConnectionHandler:
+) -> ConnectionHandler:
     """Creates a connection to Redis that is lazily set up and reconnects if the
     connections is lost.
     """
@@ -380,7 +385,7 @@ def lazyConnection(
         dbid=dbid,
         poolsize=1,
         isLazy=True,
-        handler=txredisapi.ConnectionHandler,
+        handler=ConnectionHandler,
         password=password,
         replyTimeout=replyTimeout,
     )
@@ -408,3 +413,44 @@ def lazyConnection(
         )
 
     return factory.handler
+
+
+def lazyUnixConnection(
+    hs: "HomeServer",
+    path: str = "/tmp/redis.sock",
+    dbid: Optional[int] = None,
+    reconnect: bool = True,
+    password: Optional[str] = None,
+    replyTimeout: int = 30,
+) -> ConnectionHandler:
+    """Creates a connection to Redis that is lazily set up and reconnects if the
+    connection is lost.
+
+    Returns:
+        A subclass of ConnectionHandler, which is a UnixConnectionHandler in this case.
+    """
+
+    uuid = path
+
+    factory = SynapseRedisFactory(
+        hs,
+        uuid=uuid,
+        dbid=dbid,
+        poolsize=1,
+        isLazy=True,
+        handler=UnixConnectionHandler,
+        password=password,
+        replyTimeout=replyTimeout,
+    )
+    factory.continueTrying = reconnect
+
+    reactor = hs.get_reactor()
+
+    reactor.connectUNIX(
+        path,
+        factory,
+        timeout=30,
+        checkPID=False,
+    )
+
+    return factory.handler
diff --git a/synapse/server.py b/synapse/server.py
index f6e245569c..cce5fb66ff 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -864,22 +864,36 @@ class HomeServer(metaclass=abc.ABCMeta):
 
         # We only want to import redis module if we're using it, as we have
         # `txredisapi` as an optional dependency.
-        from synapse.replication.tcp.redis import lazyConnection
+        from synapse.replication.tcp.redis import lazyConnection, lazyUnixConnection
 
-        logger.info(
-            "Connecting to redis (host=%r port=%r) for external cache",
-            self.config.redis.redis_host,
-            self.config.redis.redis_port,
-        )
+        if self.config.redis.redis_path is None:
+            logger.info(
+                "Connecting to redis (host=%r port=%r) for external cache",
+                self.config.redis.redis_host,
+                self.config.redis.redis_port,
+            )
 
-        return lazyConnection(
-            hs=self,
-            host=self.config.redis.redis_host,
-            port=self.config.redis.redis_port,
-            dbid=self.config.redis.redis_dbid,
-            password=self.config.redis.redis_password,
-            reconnect=True,
-        )
+            return lazyConnection(
+                hs=self,
+                host=self.config.redis.redis_host,
+                port=self.config.redis.redis_port,
+                dbid=self.config.redis.redis_dbid,
+                password=self.config.redis.redis_password,
+                reconnect=True,
+            )
+        else:
+            logger.info(
+                "Connecting to redis (path=%r) for external cache",
+                self.config.redis.redis_path,
+            )
+
+            return lazyUnixConnection(
+                hs=self,
+                path=self.config.redis.redis_path,
+                dbid=self.config.redis.redis_dbid,
+                password=self.config.redis.redis_password,
+                reconnect=True,
+            )
 
     def should_send_federation(self) -> bool:
         "Should this server be sending federation traffic directly?"