summary refs log tree commit diff
path: root/synapse/config/tls.py
blob: 2a5be707ff37f31696dd83694fde7fb86ad4ea8a (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#

import logging
from typing import Any, List, Optional, Pattern

from matrix_common.regex import glob_to_regex

from OpenSSL import SSL, crypto
from twisted.internet._sslverify import Certificate, trustRootFromCertificates

from synapse.config._base import Config, ConfigError
from synapse.types import JsonDict

logger = logging.getLogger(__name__)


class TlsConfig(Config):
    section = "tls"

    def read_config(self, config: JsonDict, **kwargs: Any) -> None:
        self.tls_certificate_file = self.abspath(config.get("tls_certificate_path"))
        self.tls_private_key_file = self.abspath(config.get("tls_private_key_path"))

        if self.root.server.has_tls_listener():
            if not self.tls_certificate_file:
                raise ConfigError(
                    "tls_certificate_path must be specified if TLS-enabled listeners are "
                    "configured."
                )
            if not self.tls_private_key_file:
                raise ConfigError(
                    "tls_private_key_path must be specified if TLS-enabled listeners are "
                    "configured."
                )

        # Whether to verify certificates on outbound federation traffic
        self.federation_verify_certificates = config.get(
            "federation_verify_certificates", True
        )

        # Minimum TLS version to use for outbound federation traffic
        self.federation_client_minimum_tls_version = str(
            config.get("federation_client_minimum_tls_version", 1)
        )

        if self.federation_client_minimum_tls_version not in ["1", "1.1", "1.2", "1.3"]:
            raise ConfigError(
                "federation_client_minimum_tls_version must be one of: 1, 1.1, 1.2, 1.3"
            )

        # Prevent people shooting themselves in the foot here by setting it to
        # the biggest number blindly
        if self.federation_client_minimum_tls_version == "1.3":
            if getattr(SSL, "OP_NO_TLSv1_3", None) is None:
                raise ConfigError(
                    "federation_client_minimum_tls_version cannot be 1.3, "
                    "your OpenSSL does not support it"
                )

        # Whitelist of domains to not verify certificates for
        fed_whitelist_entries = config.get(
            "federation_certificate_verification_whitelist", []
        )
        if fed_whitelist_entries is None:
            fed_whitelist_entries = []

        # Support globs (*) in whitelist values
        self.federation_certificate_verification_whitelist: List[Pattern] = []
        for entry in fed_whitelist_entries:
            try:
                entry_regex = glob_to_regex(entry.encode("ascii").decode("ascii"))
            except UnicodeEncodeError:
                raise ConfigError(
                    "IDNA domain names are not allowed in the "
                    "federation_certificate_verification_whitelist: %s" % (entry,)
                )

            # Convert globs to regex
            self.federation_certificate_verification_whitelist.append(entry_regex)

        # List of custom certificate authorities for federation traffic validation
        custom_ca_list = config.get("federation_custom_ca_list", None)

        # Read in and parse custom CA certificates
        self.federation_ca_trust_root = None
        if custom_ca_list is not None:
            if len(custom_ca_list) == 0:
                # A trustroot cannot be generated without any CA certificates.
                # Raise an error if this option has been specified without any
                # corresponding certificates.
                raise ConfigError(
                    "federation_custom_ca_list specified without "
                    "any certificate files"
                )

            certs = []
            for ca_file in custom_ca_list:
                logger.debug("Reading custom CA certificate file: %s", ca_file)
                content = self.read_file(ca_file, "federation_custom_ca_list")

                # Parse the CA certificates
                try:
                    cert_base = Certificate.loadPEM(content)
                    certs.append(cert_base)
                except Exception as e:
                    raise ConfigError(
                        "Error parsing custom CA certificate file %s: %s" % (ca_file, e)
                    )

            self.federation_ca_trust_root = trustRootFromCertificates(certs)

        # This config option applies to non-federation HTTP clients
        # (e.g. for talking to recaptcha, identity servers, and such)
        # It should never be used in production, and is intended for
        # use only when running tests.
        self.use_insecure_ssl_client_just_for_testing_do_not_use = config.get(
            "use_insecure_ssl_client_just_for_testing_do_not_use"
        )

        self.tls_certificate: Optional[crypto.X509] = None
        self.tls_private_key: Optional[crypto.PKey] = None

    def read_certificate_from_disk(self) -> None:
        """
        Read the certificates and private key from disk.
        """
        self.tls_private_key = self.read_tls_private_key()
        self.tls_certificate = self.read_tls_certificate()

    def generate_config_section(
        self,
        tls_certificate_path: Optional[str],
        tls_private_key_path: Optional[str],
        **kwargs: Any,
    ) -> str:
        """If the TLS paths are not specified the default will be certs in the
        config directory"""

        if bool(tls_certificate_path) != bool(tls_private_key_path):
            raise ConfigError(
                "Please specify both a cert path and a key path or neither."
            )

        if tls_certificate_path and tls_private_key_path:
            return f"""\
                tls_certificate_path: {tls_certificate_path}
                tls_private_key_path: {tls_private_key_path}
                """
        else:
            return ""

    def read_tls_certificate(self) -> crypto.X509:
        """Reads the TLS certificate from the configured file, and returns it

        Returns:
            The certificate
        """
        cert_path = self.tls_certificate_file
        logger.info("Loading TLS certificate from %s", cert_path)
        cert_pem = self.read_file(cert_path, "tls_certificate_path")
        cert = crypto.load_certificate(crypto.FILETYPE_PEM, cert_pem.encode())

        return cert

    def read_tls_private_key(self) -> crypto.PKey:
        """Reads the TLS private key from the configured file, and returns it

        Returns:
            The private key
        """
        private_key_path = self.tls_private_key_file
        logger.info("Loading TLS key from %s", private_key_path)
        private_key_pem = self.read_file(private_key_path, "tls_private_key_path")
        return crypto.load_privatekey(crypto.FILETYPE_PEM, private_key_pem)