summary refs log tree commit diff
path: root/synapse/http/proxy.py
blob: 0874d67760ae7e9adcf046422d0cd4c34cf1662c (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
#  Copyright 2023 The Matrix.org Foundation C.I.C.
#
#  Licensed under the Apache License, Version 2.0 (the "License");
#  you may not use this file except in compliance with the License.
#  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
#  limitations under the License.
#

import json
import logging
import urllib.parse
from typing import TYPE_CHECKING, Any, Optional, Set, Tuple, cast

from twisted.internet import protocol
from twisted.internet.interfaces import ITCPTransport
from twisted.internet.protocol import connectionDone
from twisted.python import failure
from twisted.python.failure import Failure
from twisted.web.client import ResponseDone
from twisted.web.http_headers import Headers
from twisted.web.iweb import IAgent, IResponse
from twisted.web.resource import IResource
from twisted.web.server import Site

from synapse.api.errors import Codes
from synapse.http import QuieterFileBodyProducer
from synapse.http.server import _AsyncResource
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.types import ISynapseReactor
from synapse.util.async_helpers import timeout_deferred

if TYPE_CHECKING:
    from synapse.http.site import SynapseRequest

logger = logging.getLogger(__name__)

# "Hop-by-hop" headers (as opposed to "end-to-end" headers) as defined by RFC2616
# section 13.5.1 and referenced in RFC9110 section 7.6.1. These are meant to only be
# consumed by the immediate recipient and not be forwarded on.
HOP_BY_HOP_HEADERS = {
    "Connection",
    "Keep-Alive",
    "Proxy-Authenticate",
    "Proxy-Authorization",
    "TE",
    "Trailers",
    "Transfer-Encoding",
    "Upgrade",
}


def parse_connection_header_value(
    connection_header_value: Optional[bytes],
) -> Set[str]:
    """
    Parse the `Connection` header to determine which headers we should not be copied
    over from the remote response.

    As defined by RFC2616 section 14.10 and RFC9110 section 7.6.1

    Example: `Connection: close, X-Foo, X-Bar` will return `{"Close", "X-Foo", "X-Bar"}`

    Even though "close" is a special directive, let's just treat it as just another
    header for simplicity. If people want to check for this directive, they can simply
    check for `"Close" in headers`.

    Args:
        connection_header_value: The value of the `Connection` header.

    Returns:
        The set of header names that should not be copied over from the remote response.
        The keys are capitalized in canonical capitalization.
    """
    headers = Headers()
    extra_headers_to_remove: Set[str] = set()
    if connection_header_value:
        extra_headers_to_remove = {
            headers._canonicalNameCaps(connection_option.strip()).decode("ascii")
            for connection_option in connection_header_value.split(b",")
        }

    return extra_headers_to_remove


class ProxyResource(_AsyncResource):
    """
    A stub resource that proxies any requests with a `matrix-federation://` scheme
    through the given `federation_agent` to the remote homeserver and ferries back the
    info.
    """

    isLeaf = True

    def __init__(self, reactor: ISynapseReactor, federation_agent: IAgent):
        super().__init__(True)

        self.reactor = reactor
        self.agent = federation_agent

    async def _async_render(self, request: "SynapseRequest") -> Tuple[int, Any]:
        uri = urllib.parse.urlparse(request.uri)
        assert uri.scheme == b"matrix-federation"

        headers = Headers()
        for header_name in (b"User-Agent", b"Authorization", b"Content-Type"):
            header_value = request.getHeader(header_name)
            if header_value:
                headers.addRawHeader(header_name, header_value)

        request_deferred = run_in_background(
            self.agent.request,
            request.method,
            request.uri,
            headers=headers,
            bodyProducer=QuieterFileBodyProducer(request.content),
        )
        request_deferred = timeout_deferred(
            request_deferred,
            # This should be set longer than the timeout in `MatrixFederationHttpClient`
            # so that it has enough time to complete and pass us the data before we give
            # up.
            timeout=90,
            reactor=self.reactor,
        )

        response = await make_deferred_yieldable(request_deferred)

        return response.code, response

    def _send_response(
        self,
        request: "SynapseRequest",
        code: int,
        response_object: Any,
    ) -> None:
        response = cast(IResponse, response_object)
        response_headers = cast(Headers, response.headers)

        request.setResponseCode(code)

        # The `Connection` header also defines which headers should not be copied over.
        connection_header = response_headers.getRawHeaders(b"connection")
        extra_headers_to_remove = parse_connection_header_value(
            connection_header[0] if connection_header else None
        )

        # Copy headers.
        for k, v in response_headers.getAllRawHeaders():
            # Do not copy over any hop-by-hop headers. These are meant to only be
            # consumed by the immediate recipient and not be forwarded on.
            header_key = k.decode("ascii")
            if (
                header_key in HOP_BY_HOP_HEADERS
                or header_key in extra_headers_to_remove
            ):
                continue

            request.responseHeaders.setRawHeaders(k, v)

        response.deliverBody(_ProxyResponseBody(request))

    def _send_error_response(
        self,
        f: failure.Failure,
        request: "SynapseRequest",
    ) -> None:
        request.setResponseCode(502)
        request.setHeader(b"Content-Type", b"application/json")
        request.write(
            (
                json.dumps(
                    {
                        "errcode": Codes.UNKNOWN,
                        "err": "ProxyResource: Error when proxying request: %s %s -> %s"
                        % (
                            request.method.decode("ascii"),
                            request.uri.decode("ascii"),
                            f,
                        ),
                    }
                )
            ).encode()
        )
        request.finish()


class _ProxyResponseBody(protocol.Protocol):
    """
    A protocol that proxies the given remote response data back out to the given local
    request.
    """

    transport: Optional[ITCPTransport] = None

    def __init__(self, request: "SynapseRequest") -> None:
        self._request = request

    def dataReceived(self, data: bytes) -> None:
        # Avoid sending response data to the local request that already disconnected
        if self._request._disconnected and self.transport is not None:
            # Close the connection (forcefully) since all the data will get
            # discarded anyway.
            self.transport.abortConnection()
            return

        self._request.write(data)

    def connectionLost(self, reason: Failure = connectionDone) -> None:
        # If the local request is already finished (successfully or failed), don't
        # worry about sending anything back.
        if self._request.finished:
            return

        if reason.check(ResponseDone):
            self._request.finish()
        else:
            # Abort the underlying request since our remote request also failed.
            self._request.transport.abortConnection()


class ProxySite(Site):
    """
    Proxies any requests with a `matrix-federation://` scheme through the given
    `federation_agent`. Otherwise, behaves like a normal `Site`.
    """

    def __init__(
        self,
        resource: IResource,
        reactor: ISynapseReactor,
        federation_agent: IAgent,
    ):
        super().__init__(resource, reactor=reactor)

        self._proxy_resource = ProxyResource(reactor, federation_agent)

    def getResourceFor(self, request: "SynapseRequest") -> IResource:
        uri = urllib.parse.urlparse(request.uri)
        if uri.scheme == b"matrix-federation":
            return self._proxy_resource

        return super().getResourceFor(request)