summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/15353.misc1
-rw-r--r--synapse/app/_base.py92
-rw-r--r--synapse/app/generic_worker.py34
-rw-r--r--synapse/app/homeserver.py42
-rw-r--r--synapse/config/server.py118
-rw-r--r--synapse/config/workers.py13
-rw-r--r--synapse/http/site.py27
-rw-r--r--synapse/types/__init__.py2
8 files changed, 239 insertions, 90 deletions
diff --git a/changelog.d/15353.misc b/changelog.d/15353.misc
new file mode 100644
index 0000000000..23927fea8f
--- /dev/null
+++ b/changelog.d/15353.misc
@@ -0,0 +1 @@
+Add experimental support for Unix sockets. Contributed by Jason Little.
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 28062dd69d..f7b866978c 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -41,7 +41,12 @@ from typing_extensions import ParamSpec
 
 import twisted
 from twisted.internet import defer, error, reactor as _reactor
-from twisted.internet.interfaces import IOpenSSLContextFactory, IReactorSSL, IReactorTCP
+from twisted.internet.interfaces import (
+    IOpenSSLContextFactory,
+    IReactorSSL,
+    IReactorTCP,
+    IReactorUNIX,
+)
 from twisted.internet.protocol import ServerFactory
 from twisted.internet.tcp import Port
 from twisted.logger import LoggingFile, LogLevel
@@ -56,7 +61,7 @@ from synapse.app.phone_stats_home import start_phone_stats_home
 from synapse.config import ConfigError
 from synapse.config._base import format_config_error
 from synapse.config.homeserver import HomeServerConfig
-from synapse.config.server import ListenerConfig, ManholeConfig
+from synapse.config.server import ListenerConfig, ManholeConfig, TCPListenerConfig
 from synapse.crypto import context_factory
 from synapse.events.presence_router import load_legacy_presence_router
 from synapse.events.spamcheck import load_legacy_spam_checkers
@@ -351,6 +356,28 @@ def listen_tcp(
     return r  # type: ignore[return-value]
 
 
+def listen_unix(
+    path: str,
+    mode: int,
+    factory: ServerFactory,
+    reactor: IReactorUNIX = reactor,
+    backlog: int = 50,
+) -> List[Port]:
+    """
+    Create a UNIX socket for a given path and 'mode' permission
+
+    Returns:
+        list of twisted.internet.tcp.Port listening for TCP connections
+    """
+    wantPID = True
+
+    return [
+        # IReactorUNIX returns an object implementing IListeningPort from listenUNIX,
+        # but we know it will be a Port instance.
+        cast(Port, reactor.listenUNIX(path, factory, backlog, mode, wantPID))
+    ]
+
+
 def listen_http(
     listener_config: ListenerConfig,
     root_resource: Resource,
@@ -359,18 +386,13 @@ def listen_http(
     context_factory: Optional[IOpenSSLContextFactory],
     reactor: ISynapseReactor = reactor,
 ) -> List[Port]:
-    port = listener_config.port
-    bind_addresses = listener_config.bind_addresses
-    tls = listener_config.tls
-
     assert listener_config.http_options is not None
 
-    site_tag = listener_config.http_options.tag
-    if site_tag is None:
-        site_tag = str(port)
+    site_tag = listener_config.get_site_tag()
 
     site = SynapseSite(
-        "synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
+        "synapse.access.%s.%s"
+        % ("https" if listener_config.is_tls() else "http", site_tag),
         site_tag,
         listener_config,
         root_resource,
@@ -378,25 +400,41 @@ def listen_http(
         max_request_body_size=max_request_body_size,
         reactor=reactor,
     )
-    if tls:
-        # refresh_certificate should have been called before this.
-        assert context_factory is not None
-        ports = listen_ssl(
-            bind_addresses,
-            port,
-            site,
-            context_factory,
-            reactor=reactor,
-        )
-        logger.info("Synapse now listening on TCP port %d (TLS)", port)
+
+    if isinstance(listener_config, TCPListenerConfig):
+        if listener_config.is_tls():
+            # refresh_certificate should have been called before this.
+            assert context_factory is not None
+            ports = listen_ssl(
+                listener_config.bind_addresses,
+                listener_config.port,
+                site,
+                context_factory,
+                reactor=reactor,
+            )
+            logger.info(
+                "Synapse now listening on TCP port %d (TLS)", listener_config.port
+            )
+        else:
+            ports = listen_tcp(
+                listener_config.bind_addresses,
+                listener_config.port,
+                site,
+                reactor=reactor,
+            )
+            logger.info("Synapse now listening on TCP port %d", listener_config.port)
+
     else:
-        ports = listen_tcp(
-            bind_addresses,
-            port,
-            site,
-            reactor=reactor,
+        ports = listen_unix(
+            listener_config.path, listener_config.mode, site, reactor=reactor
         )
-        logger.info("Synapse now listening on TCP port %d", port)
+        # getHost() returns a UNIXAddress which contains an instance variable of 'name'
+        # encoded as a byte string. Decode as utf-8 so pretty.
+        logger.info(
+            "Synapse now listening on Unix Socket at: "
+            f"{ports[0].getHost().name.decode('utf-8')}"
+        )
+
     return ports
 
 
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 0dec24369a..e17ce35b8e 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -38,7 +38,7 @@ from synapse.app._base import (
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
-from synapse.config.server import ListenerConfig
+from synapse.config.server import ListenerConfig, TCPListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.server import JsonResource, OptionsResource
 from synapse.logging.context import LoggingContext
@@ -236,12 +236,18 @@ class GenericWorkerServer(HomeServer):
             if listener.type == "http":
                 self._listen_http(listener)
             elif listener.type == "manhole":
-                _base.listen_manhole(
-                    listener.bind_addresses,
-                    listener.port,
-                    manhole_settings=self.config.server.manhole_settings,
-                    manhole_globals={"hs": self},
-                )
+                if isinstance(listener, TCPListenerConfig):
+                    _base.listen_manhole(
+                        listener.bind_addresses,
+                        listener.port,
+                        manhole_settings=self.config.server.manhole_settings,
+                        manhole_globals={"hs": self},
+                    )
+                else:
+                    raise ConfigError(
+                        "Can not using a unix socket for manhole at this time."
+                    )
+
             elif listener.type == "metrics":
                 if not self.config.metrics.enable_metrics:
                     logger.warning(
@@ -249,10 +255,16 @@ class GenericWorkerServer(HomeServer):
                         "enable_metrics is not True!"
                     )
                 else:
-                    _base.listen_metrics(
-                        listener.bind_addresses,
-                        listener.port,
-                    )
+                    if isinstance(listener, TCPListenerConfig):
+                        _base.listen_metrics(
+                            listener.bind_addresses,
+                            listener.port,
+                        )
+                    else:
+                        raise ConfigError(
+                            "Can not use a unix socket for metrics at this time."
+                        )
+
             else:
                 logger.warning("Unsupported listener type: %s", listener.type)
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index b8830b1a9c..84236ac299 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -44,7 +44,7 @@ from synapse.app._base import (
 )
 from synapse.config._base import ConfigError, format_config_error
 from synapse.config.homeserver import HomeServerConfig
-from synapse.config.server import ListenerConfig
+from synapse.config.server import ListenerConfig, TCPListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import (
@@ -78,14 +78,13 @@ class SynapseHomeServer(HomeServer):
     DATASTORE_CLASS = DataStore  # type: ignore
 
     def _listener_http(
-        self, config: HomeServerConfig, listener_config: ListenerConfig
+        self,
+        config: HomeServerConfig,
+        listener_config: ListenerConfig,
     ) -> Iterable[Port]:
-        port = listener_config.port
         # Must exist since this is an HTTP listener.
         assert listener_config.http_options is not None
-        site_tag = listener_config.http_options.tag
-        if site_tag is None:
-            site_tag = str(port)
+        site_tag = listener_config.get_site_tag()
 
         # We always include a health resource.
         resources: Dict[str, Resource] = {"/health": HealthResource()}
@@ -252,12 +251,17 @@ class SynapseHomeServer(HomeServer):
                     self._listener_http(self.config, listener)
                 )
             elif listener.type == "manhole":
-                _base.listen_manhole(
-                    listener.bind_addresses,
-                    listener.port,
-                    manhole_settings=self.config.server.manhole_settings,
-                    manhole_globals={"hs": self},
-                )
+                if isinstance(listener, TCPListenerConfig):
+                    _base.listen_manhole(
+                        listener.bind_addresses,
+                        listener.port,
+                        manhole_settings=self.config.server.manhole_settings,
+                        manhole_globals={"hs": self},
+                    )
+                else:
+                    raise ConfigError(
+                        "Can not use a unix socket for manhole at this time."
+                    )
             elif listener.type == "metrics":
                 if not self.config.metrics.enable_metrics:
                     logger.warning(
@@ -265,10 +269,16 @@ class SynapseHomeServer(HomeServer):
                         "enable_metrics is not True!"
                     )
                 else:
-                    _base.listen_metrics(
-                        listener.bind_addresses,
-                        listener.port,
-                    )
+                    if isinstance(listener, TCPListenerConfig):
+                        _base.listen_metrics(
+                            listener.bind_addresses,
+                            listener.port,
+                        )
+                    else:
+                        raise ConfigError(
+                            "Can not use a unix socket for metrics at this time."
+                        )
+
             else:
                 # this shouldn't happen, as the listener type should have been checked
                 # during parsing
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 0e46b849cf..386c3194b8 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -214,17 +214,52 @@ class HttpListenerConfig:
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
-class ListenerConfig:
-    """Object describing the configuration of a single listener."""
+class TCPListenerConfig:
+    """Object describing the configuration of a single TCP listener."""
 
     port: int = attr.ib(validator=attr.validators.instance_of(int))
-    bind_addresses: List[str]
+    bind_addresses: List[str] = attr.ib(validator=attr.validators.instance_of(List))
     type: str = attr.ib(validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
     tls: bool = False
 
     # http_options is only populated if type=http
     http_options: Optional[HttpListenerConfig] = None
 
+    def get_site_tag(self) -> str:
+        """Retrieves http_options.tag if it exists, otherwise the port number."""
+        if self.http_options and self.http_options.tag is not None:
+            return self.http_options.tag
+        else:
+            return str(self.port)
+
+    def is_tls(self) -> bool:
+        return self.tls
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class UnixListenerConfig:
+    """Object describing the configuration of a single Unix socket listener."""
+
+    # Note: unix sockets can not be tls encrypted, so HAVE to be behind a tls-handling
+    # reverse proxy
+    path: str = attr.ib()
+    # A default(0o666) for this is set in parse_listener_def() below
+    mode: int
+    type: str = attr.ib(validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
+
+    # http_options is only populated if type=http
+    http_options: Optional[HttpListenerConfig] = None
+
+    def get_site_tag(self) -> str:
+        return "unix"
+
+    def is_tls(self) -> bool:
+        """Unix sockets can't have TLS"""
+        return False
+
+
+ListenerConfig = Union[TCPListenerConfig, UnixListenerConfig]
+
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class ManholeConfig:
@@ -531,12 +566,12 @@ class ServerConfig(Config):
 
         self.listeners = [parse_listener_def(i, x) for i, x in enumerate(listeners)]
 
-        # no_tls is not really supported any more, but let's grandfather it in
-        # here.
+        # no_tls is not really supported anymore, but let's grandfather it in here.
         if config.get("no_tls", False):
             l2 = []
             for listener in self.listeners:
-                if listener.tls:
+                if isinstance(listener, TCPListenerConfig) and listener.tls:
+                    # Use isinstance() as the assertion this *has* a listener.port
                     logger.info(
                         "Ignoring TLS-enabled listener on port %i due to no_tls",
                         listener.port,
@@ -577,7 +612,7 @@ class ServerConfig(Config):
             )
 
             self.listeners.append(
-                ListenerConfig(
+                TCPListenerConfig(
                     port=bind_port,
                     bind_addresses=[bind_host],
                     tls=True,
@@ -589,7 +624,7 @@ class ServerConfig(Config):
             unsecure_port = config.get("unsecure_port", bind_port - 400)
             if unsecure_port:
                 self.listeners.append(
-                    ListenerConfig(
+                    TCPListenerConfig(
                         port=unsecure_port,
                         bind_addresses=[bind_host],
                         tls=False,
@@ -601,7 +636,7 @@ class ServerConfig(Config):
         manhole = config.get("manhole")
         if manhole:
             self.listeners.append(
-                ListenerConfig(
+                TCPListenerConfig(
                     port=manhole,
                     bind_addresses=["127.0.0.1"],
                     type="manhole",
@@ -648,7 +683,7 @@ class ServerConfig(Config):
             logger.warning(METRICS_PORT_WARNING)
 
             self.listeners.append(
-                ListenerConfig(
+                TCPListenerConfig(
                     port=metrics_port,
                     bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
                     type="http",
@@ -724,7 +759,7 @@ class ServerConfig(Config):
             self.delete_stale_devices_after = None
 
     def has_tls_listener(self) -> bool:
-        return any(listener.tls for listener in self.listeners)
+        return any(listener.is_tls() for listener in self.listeners)
 
     def generate_config_section(
         self,
@@ -904,25 +939,25 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
         raise ConfigError(DIRECT_TCP_ERROR, ("listeners", str(num), "type"))
 
     port = listener.get("port")
-    if type(port) is not int:
+    socket_path = listener.get("path")
+    # Either a port or a path should be declared at a minimum. Using both would be bad.
+    if port is not None and not isinstance(port, int):
         raise ConfigError("Listener configuration is lacking a valid 'port' option")
+    if socket_path is not None and not isinstance(socket_path, str):
+        raise ConfigError("Listener configuration is lacking a valid 'path' option")
+    if port and socket_path:
+        raise ConfigError(
+            "Can not have both a UNIX socket and an IP/port declared for the same "
+            "resource!"
+        )
+    if port is None and socket_path is None:
+        raise ConfigError(
+            "Must have either a UNIX socket or an IP/port declared for a given "
+            "resource!"
+        )
 
     tls = listener.get("tls", False)
 
-    bind_addresses = listener.get("bind_addresses", [])
-    bind_address = listener.get("bind_address")
-    # if bind_address was specified, add it to the list of addresses
-    if bind_address:
-        bind_addresses.append(bind_address)
-
-    # if we still have an empty list of addresses, use the default list
-    if not bind_addresses:
-        if listener_type == "metrics":
-            # the metrics listener doesn't support IPv6
-            bind_addresses.append("0.0.0.0")
-        else:
-            bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
-
     http_config = None
     if listener_type == "http":
         try:
@@ -932,8 +967,12 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
         except ValueError as e:
             raise ConfigError("Unknown listener resource") from e
 
+        # For a unix socket, default x_forwarded to True, as this is the only way of
+        # getting a client IP.
+        # Note: a reverse proxy is required anyway, as there is no way of exposing a
+        # unix socket to the internet.
         http_config = HttpListenerConfig(
-            x_forwarded=listener.get("x_forwarded", False),
+            x_forwarded=listener.get("x_forwarded", (True if socket_path else False)),
             resources=resources,
             additional_resources=listener.get("additional_resources", {}),
             tag=listener.get("tag"),
@@ -941,7 +980,30 @@ def parse_listener_def(num: int, listener: Any) -> ListenerConfig:
             experimental_cors_msc3886=listener.get("experimental_cors_msc3886", False),
         )
 
-    return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
+    if socket_path:
+        # TODO: Add in path validation, like if the directory exists and is writable?
+        # Set a default for the permission, in case it's left out
+        socket_mode = listener.get("mode", 0o666)
+
+        return UnixListenerConfig(socket_path, socket_mode, listener_type, http_config)
+
+    else:
+        assert port is not None
+        bind_addresses = listener.get("bind_addresses", [])
+        bind_address = listener.get("bind_address")
+        # if bind_address was specified, add it to the list of addresses
+        if bind_address:
+            bind_addresses.append(bind_address)
+
+        # if we still have an empty list of addresses, use the default list
+        if not bind_addresses:
+            if listener_type == "metrics":
+                # the metrics listener doesn't support IPv6
+                bind_addresses.append("0.0.0.0")
+            else:
+                bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
+
+        return TCPListenerConfig(port, bind_addresses, listener_type, tls, http_config)
 
 
 _MANHOLE_SETTINGS_SCHEMA = {
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index 2580660b6c..1dfbe27e89 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -19,15 +19,18 @@ from typing import Any, Dict, List, Union
 
 import attr
 
-from synapse.types import JsonDict
-
-from ._base import (
+from synapse.config._base import (
     Config,
     ConfigError,
     RoutableShardedWorkerHandlingConfig,
     ShardedWorkerHandlingConfig,
 )
-from .server import DIRECT_TCP_ERROR, ListenerConfig, parse_listener_def
+from synapse.config.server import (
+    DIRECT_TCP_ERROR,
+    TCPListenerConfig,
+    parse_listener_def,
+)
+from synapse.types import JsonDict
 
 _DEPRECATED_WORKER_DUTY_OPTION_USED = """
 The '%s' configuration option is deprecated and will be removed in a future
@@ -161,7 +164,7 @@ class WorkerConfig(Config):
         manhole = config.get("worker_manhole")
         if manhole:
             self.worker_listeners.append(
-                ListenerConfig(
+                TCPListenerConfig(
                     port=manhole,
                     bind_addresses=["127.0.0.1"],
                     type="manhole",
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 6a1dbf7f33..c530966ef3 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Generator, Optional, Tuple, Union
 import attr
 from zope.interface import implementer
 
+from twisted.internet.address import UNIXAddress
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IAddress, IReactorTime
 from twisted.python.failure import Failure
@@ -257,7 +258,7 @@ class SynapseRequest(Request):
             request_id,
             request=ContextRequest(
                 request_id=request_id,
-                ip_address=self.getClientAddress().host,
+                ip_address=self.get_client_ip_if_available(),
                 site_tag=self.synapse_site.site_tag,
                 # The requester is going to be unknown at this point.
                 requester=None,
@@ -414,7 +415,7 @@ class SynapseRequest(Request):
 
         self.synapse_site.access_logger.debug(
             "%s - %s - Received request: %s %s",
-            self.getClientAddress().host,
+            self.get_client_ip_if_available(),
             self.synapse_site.site_tag,
             self.get_method(),
             self.get_redacted_uri(),
@@ -462,7 +463,7 @@ class SynapseRequest(Request):
             "%s - %s - {%s}"
             " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
             ' %sB %s "%s %s %s" "%s" [%d dbevts]',
-            self.getClientAddress().host,
+            self.get_client_ip_if_available(),
             self.synapse_site.site_tag,
             requester,
             processing_time,
@@ -500,6 +501,26 @@ class SynapseRequest(Request):
 
         return True
 
+    def get_client_ip_if_available(self) -> str:
+        """Logging helper. Return something useful when a client IP is not retrievable
+        from a unix socket.
+
+        In practice, this returns the socket file path on a SynapseRequest if using a
+        unix socket and the normal IP address for TCP sockets.
+
+        """
+        # getClientAddress().host returns a proper IP address for a TCP socket. But
+        # unix sockets have no concept of IP addresses or ports and return a
+        # UNIXAddress containing a 'None' value. In order to get something usable for
+        # logs(where this is used) get the unix socket file. getHost() returns a
+        # UNIXAddress containing a value of the socket file and has an instance
+        # variable of 'name' encoded as a byte string containing the path we want.
+        # Decode to utf-8 so it looks nice.
+        if isinstance(self.getClientAddress(), UNIXAddress):
+            return self.getHost().name.decode("utf-8")
+        else:
+            return self.getClientAddress().host
+
 
 class XForwardedForRequest(SynapseRequest):
     """Request object which honours proxy headers
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index c09b9cf87d..5cee9c3194 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -50,6 +50,7 @@ from twisted.internet.interfaces import (
     IReactorTCP,
     IReactorThreads,
     IReactorTime,
+    IReactorUNIX,
 )
 
 from synapse.api.errors import Codes, SynapseError
@@ -91,6 +92,7 @@ StrCollection = Union[Tuple[str, ...], List[str], AbstractSet[str]]
 class ISynapseReactor(
     IReactorTCP,
     IReactorSSL,
+    IReactorUNIX,
     IReactorPluggableNameResolver,
     IReactorTime,
     IReactorCore,