diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/http/client.py | 211 | ||||
-rw-r--r-- | synapse/http/matrixfederationclient.py | 74 |
2 files changed, 138 insertions, 147 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index f409368802..e5b13593f2 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -14,9 +14,10 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -import urllib +import urllib.parse from io import BytesIO from typing import ( + TYPE_CHECKING, Any, BinaryIO, Dict, @@ -31,7 +32,7 @@ from typing import ( import treq from canonicaljson import encode_canonical_json -from netaddr import IPAddress +from netaddr import IPAddress, IPSet from prometheus_client import Counter from zope.interface import implementer, provider @@ -39,6 +40,8 @@ from OpenSSL import SSL from OpenSSL.SSL import VERIFY_NONE from twisted.internet import defer, error as twisted_error, protocol, ssl from twisted.internet.interfaces import ( + IAddress, + IHostResolution, IReactorPluggableNameResolver, IResolutionReceiver, ) @@ -53,7 +56,7 @@ from twisted.web.client import ( ) from twisted.web.http import PotentialDataLoss from twisted.web.http_headers import Headers -from twisted.web.iweb import IResponse +from twisted.web.iweb import IAgent, IBodyProducer, IResponse from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.http import QuieterFileBodyProducer, RequestTimedOutError, redact_uri @@ -63,6 +66,9 @@ from synapse.logging.opentracing import set_tag, start_active_span, tags from synapse.util import json_decoder from synapse.util.async_helpers import timeout_deferred +if TYPE_CHECKING: + from synapse.app.homeserver import HomeServer + logger = logging.getLogger(__name__) outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"]) @@ -84,12 +90,19 @@ QueryParamValue = Union[str, bytes, Iterable[Union[str, bytes]]] QueryParams = Union[Mapping[str, QueryParamValue], Mapping[bytes, QueryParamValue]] -def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist): +def check_against_blacklist( + ip_address: IPAddress, ip_whitelist: Optional[IPSet], ip_blacklist: IPSet +) -> bool: """ + Compares an IP address to allowed and disallowed IP sets. + Args: - ip_address (netaddr.IPAddress) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + ip_address: The IP address to check + ip_whitelist: Allowed IP addresses. + ip_blacklist: Disallowed IP addresses. + + Returns: + True if the IP address is in the blacklist and not in the whitelist. """ if ip_address in ip_blacklist: if ip_whitelist is None or ip_address not in ip_whitelist: @@ -118,23 +131,30 @@ class IPBlacklistingResolver: addresses, preventing DNS rebinding attacks on URL preview. """ - def __init__(self, reactor, ip_whitelist, ip_blacklist): + def __init__( + self, + reactor: IReactorPluggableNameResolver, + ip_whitelist: Optional[IPSet], + ip_blacklist: IPSet, + ): """ Args: - reactor (twisted.internet.reactor) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + reactor: The twisted reactor. + ip_whitelist: IP addresses to allow. + ip_blacklist: IP addresses to disallow. """ self._reactor = reactor self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist - def resolveHostName(self, recv, hostname, portNumber=0): + def resolveHostName( + self, recv: IResolutionReceiver, hostname: str, portNumber: int = 0 + ) -> IResolutionReceiver: r = recv() - addresses = [] + addresses = [] # type: List[IAddress] - def _callback(): + def _callback() -> None: r.resolutionBegan(None) has_bad_ip = False @@ -161,15 +181,15 @@ class IPBlacklistingResolver: @provider(IResolutionReceiver) class EndpointReceiver: @staticmethod - def resolutionBegan(resolutionInProgress): + def resolutionBegan(resolutionInProgress: IHostResolution) -> None: pass @staticmethod - def addressResolved(address): + def addressResolved(address: IAddress) -> None: addresses.append(address) @staticmethod - def resolutionComplete(): + def resolutionComplete() -> None: _callback() self._reactor.nameResolver.resolveHostName( @@ -185,19 +205,29 @@ class BlacklistingAgentWrapper(Agent): directly (without an IP address lookup). """ - def __init__(self, agent, reactor, ip_whitelist=None, ip_blacklist=None): + def __init__( + self, + agent: IAgent, + ip_whitelist: Optional[IPSet] = None, + ip_blacklist: Optional[IPSet] = None, + ): """ Args: - agent (twisted.web.client.Agent): The Agent to wrap. - reactor (twisted.internet.reactor) - ip_whitelist (netaddr.IPSet) - ip_blacklist (netaddr.IPSet) + agent: The Agent to wrap. + ip_whitelist: IP addresses to allow. + ip_blacklist: IP addresses to disallow. """ self._agent = agent self._ip_whitelist = ip_whitelist self._ip_blacklist = ip_blacklist - def request(self, method, uri, headers=None, bodyProducer=None): + def request( + self, + method: bytes, + uri: bytes, + headers: Optional[Headers] = None, + bodyProducer: Optional[IBodyProducer] = None, + ) -> defer.Deferred: h = urllib.parse.urlparse(uri.decode("ascii")) try: @@ -226,23 +256,23 @@ class SimpleHttpClient: def __init__( self, - hs, - treq_args={}, - ip_whitelist=None, - ip_blacklist=None, - http_proxy=None, - https_proxy=None, + hs: "HomeServer", + treq_args: Dict[str, Any] = {}, + ip_whitelist: Optional[IPSet] = None, + ip_blacklist: Optional[IPSet] = None, + http_proxy: Optional[bytes] = None, + https_proxy: Optional[bytes] = None, ): """ Args: - hs (synapse.server.HomeServer) - treq_args (dict): Extra keyword arguments to be given to treq.request. - ip_blacklist (netaddr.IPSet): The IP addresses that are blacklisted that + hs + treq_args: Extra keyword arguments to be given to treq.request. + ip_blacklist: The IP addresses that are blacklisted that we may not request. - ip_whitelist (netaddr.IPSet): The whitelisted IP addresses, that we can + ip_whitelist: The whitelisted IP addresses, that we can request if it were otherwise caught in a blacklist. - http_proxy (bytes): proxy server to use for http connections. host[:port] - https_proxy (bytes): proxy server to use for https connections. host[:port] + http_proxy: proxy server to use for http connections. host[:port] + https_proxy: proxy server to use for https connections. host[:port] """ self.hs = hs @@ -306,7 +336,6 @@ class SimpleHttpClient: # by the DNS resolution. self.agent = BlacklistingAgentWrapper( self.agent, - self.reactor, ip_whitelist=self._ip_whitelist, ip_blacklist=self._ip_blacklist, ) @@ -397,7 +426,7 @@ class SimpleHttpClient: async def post_urlencoded_get_json( self, uri: str, - args: Mapping[str, Union[str, List[str]]] = {}, + args: Optional[Mapping[str, Union[str, List[str]]]] = None, headers: Optional[RawHeaders] = None, ) -> Any: """ @@ -422,9 +451,7 @@ class SimpleHttpClient: # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) - query_bytes = urllib.parse.urlencode(encode_urlencode_args(args), True).encode( - "utf8" - ) + query_bytes = encode_query_args(args) actual_headers = { b"Content-Type": [b"application/x-www-form-urlencoded"], @@ -432,7 +459,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "POST", uri, headers=Headers(actual_headers), data=query_bytes @@ -479,7 +506,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "POST", uri, headers=Headers(actual_headers), data=json_str @@ -495,7 +522,10 @@ class SimpleHttpClient: ) async def get_json( - self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None, + self, + uri: str, + args: Optional[QueryParams] = None, + headers: Optional[RawHeaders] = None, ) -> Any: """Gets some json from the given URI. @@ -516,7 +546,7 @@ class SimpleHttpClient: """ actual_headers = {b"Accept": [b"application/json"]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore body = await self.get_raw(uri, args, headers=headers) return json_decoder.decode(body.decode("utf-8")) @@ -525,7 +555,7 @@ class SimpleHttpClient: self, uri: str, json_body: Any, - args: QueryParams = {}, + args: Optional[QueryParams] = None, headers: RawHeaders = None, ) -> Any: """Puts some json to the given URI. @@ -546,9 +576,9 @@ class SimpleHttpClient: ValueError: if the response was not JSON """ - if len(args): - query_bytes = urllib.parse.urlencode(args, True) - uri = "%s?%s" % (uri, query_bytes) + if args: + query_str = urllib.parse.urlencode(args, True) + uri = "%s?%s" % (uri, query_str) json_str = encode_canonical_json(json_body) @@ -558,7 +588,7 @@ class SimpleHttpClient: b"Accept": [b"application/json"], } if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request( "PUT", uri, headers=Headers(actual_headers), data=json_str @@ -574,7 +604,10 @@ class SimpleHttpClient: ) async def get_raw( - self, uri: str, args: QueryParams = {}, headers: Optional[RawHeaders] = None + self, + uri: str, + args: Optional[QueryParams] = None, + headers: Optional[RawHeaders] = None, ) -> bytes: """Gets raw text from the given URI. @@ -592,13 +625,13 @@ class SimpleHttpClient: HttpResponseException on a non-2xx HTTP response. """ - if len(args): - query_bytes = urllib.parse.urlencode(args, True) - uri = "%s?%s" % (uri, query_bytes) + if args: + query_str = urllib.parse.urlencode(args, True) + uri = "%s?%s" % (uri, query_str) actual_headers = {b"User-Agent": [self.user_agent]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request("GET", uri, headers=Headers(actual_headers)) @@ -641,7 +674,7 @@ class SimpleHttpClient: actual_headers = {b"User-Agent": [self.user_agent]} if headers: - actual_headers.update(headers) + actual_headers.update(headers) # type: ignore response = await self.request("GET", url, headers=Headers(actual_headers)) @@ -649,12 +682,13 @@ class SimpleHttpClient: if ( b"Content-Length" in resp_headers + and max_size and int(resp_headers[b"Content-Length"][0]) > max_size ): - logger.warning("Requested URL is too large > %r bytes" % (self.max_size,)) + logger.warning("Requested URL is too large > %r bytes" % (max_size,)) raise SynapseError( 502, - "Requested file is too large > %r bytes" % (self.max_size,), + "Requested file is too large > %r bytes" % (max_size,), Codes.TOO_LARGE, ) @@ -668,7 +702,7 @@ class SimpleHttpClient: try: length = await make_deferred_yieldable( - _readBodyToFile(response, output_stream, max_size) + readBodyToFile(response, output_stream, max_size) ) except SynapseError: # This can happen e.g. because the body is too large. @@ -696,18 +730,16 @@ def _timeout_to_request_timed_out_error(f: Failure): return f -# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. -# The two should be factored out. - - class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__(self, stream, deferred, max_size): + def __init__( + self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] + ): self.stream = stream self.deferred = deferred self.length = 0 self.max_size = max_size - def dataReceived(self, data): + def dataReceived(self, data: bytes) -> None: self.stream.write(data) self.length += len(data) if self.max_size is not None and self.length >= self.max_size: @@ -721,7 +753,7 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred = defer.Deferred() self.transport.loseConnection() - def connectionLost(self, reason): + def connectionLost(self, reason: Failure) -> None: if reason.check(ResponseDone): self.deferred.callback(self.length) elif reason.check(PotentialDataLoss): @@ -732,35 +764,48 @@ class _ReadBodyToFileProtocol(protocol.Protocol): self.deferred.errback(reason) -# XXX: FIXME: This is horribly copy-pasted from matrixfederationclient. -# The two should be factored out. +def readBodyToFile( + response: IResponse, stream: BinaryIO, max_size: Optional[int] +) -> defer.Deferred: + """ + Read a HTTP response body to a file-object. Optionally enforcing a maximum file size. + Args: + response: The HTTP response to read from. + stream: The file-object to write to. + max_size: The maximum file size to allow. + + Returns: + A Deferred which resolves to the length of the read body. + """ -def _readBodyToFile(response, stream, max_size): d = defer.Deferred() response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) return d -def encode_urlencode_args(args): - return {k: encode_urlencode_arg(v) for k, v in args.items()} +def encode_query_args(args: Optional[Mapping[str, Union[str, List[str]]]]) -> bytes: + """ + Encodes a map of query arguments to bytes which can be appended to a URL. + Args: + args: The query arguments, a mapping of string to string or list of strings. + + Returns: + The query arguments encoded as bytes. + """ + if args is None: + return b"" -def encode_urlencode_arg(arg): - if isinstance(arg, str): - return arg.encode("utf-8") - elif isinstance(arg, list): - return [encode_urlencode_arg(i) for i in arg] - else: - return arg + encoded_args = {} + for k, vs in args.items(): + if isinstance(vs, str): + vs = [vs] + encoded_args[k] = [v.encode("utf8") for v in vs] + query_str = urllib.parse.urlencode(encoded_args, True) -def _print_ex(e): - if hasattr(e, "reasons") and e.reasons: - for ex in e.reasons: - _print_ex(ex) - else: - logger.exception(e) + return query_str.encode("utf8") class InsecureInterceptableContextFactory(ssl.ContextFactory): diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index b2ccae90df..4e27f93b7a 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -19,7 +19,7 @@ import random import sys import urllib.parse from io import BytesIO -from typing import BinaryIO, Callable, Dict, List, Optional, Tuple, Union +from typing import Callable, Dict, List, Optional, Tuple, Union import attr import treq @@ -28,26 +28,27 @@ from prometheus_client import Counter from signedjson.sign import sign_json from zope.interface import implementer -from twisted.internet import defer, protocol +from twisted.internet import defer from twisted.internet.error import DNSLookupError from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator -from twisted.python.failure import Failure -from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers from twisted.web.iweb import IBodyProducer, IResponse import synapse.metrics import synapse.util.retryutils from synapse.api.errors import ( - Codes, FederationDeniedError, HttpResponseException, RequestSendFailed, - SynapseError, ) from synapse.http import QuieterFileBodyProducer -from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver +from synapse.http.client import ( + BlacklistingAgentWrapper, + IPBlacklistingResolver, + encode_query_args, + readBodyToFile, +) from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent from synapse.logging.context import make_deferred_yieldable from synapse.logging.opentracing import ( @@ -250,9 +251,7 @@ class MatrixFederationHttpClient: # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names self.agent = BlacklistingAgentWrapper( - self.agent, - self.reactor, - ip_blacklist=hs.config.federation_ip_range_blacklist, + self.agent, ip_blacklist=hs.config.federation_ip_range_blacklist, ) self.clock = hs.get_clock() @@ -986,7 +985,7 @@ class MatrixFederationHttpClient: headers = dict(response.headers.getAllRawHeaders()) try: - d = _readBodyToFile(response, output_stream, max_size) + d = readBodyToFile(response, output_stream, max_size) d.addTimeout(self.default_timeout, self.reactor) length = await make_deferred_yieldable(d) except Exception as e: @@ -1010,44 +1009,6 @@ class MatrixFederationHttpClient: return (length, headers) -class _ReadBodyToFileProtocol(protocol.Protocol): - def __init__( - self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int] - ): - self.stream = stream - self.deferred = deferred - self.length = 0 - self.max_size = max_size - - def dataReceived(self, data: bytes) -> None: - self.stream.write(data) - self.length += len(data) - if self.max_size is not None and self.length >= self.max_size: - self.deferred.errback( - SynapseError( - 502, - "Requested file is too large > %r bytes" % (self.max_size,), - Codes.TOO_LARGE, - ) - ) - self.deferred = defer.Deferred() - self.transport.loseConnection() - - def connectionLost(self, reason: Failure) -> None: - if reason.check(ResponseDone): - self.deferred.callback(self.length) - else: - self.deferred.errback(reason) - - -def _readBodyToFile( - response: IResponse, stream: BinaryIO, max_size: Optional[int] -) -> defer.Deferred: - d = defer.Deferred() - response.deliverBody(_ReadBodyToFileProtocol(stream, d, max_size)) - return d - - def _flatten_response_never_received(e): if hasattr(e, "reasons"): reasons = ", ".join( @@ -1088,18 +1049,3 @@ def check_content_type_is_json(headers: Headers) -> None: ), can_retry=False, ) - - -def encode_query_args(args: Optional[QueryArgs]) -> bytes: - if args is None: - return b"" - - encoded_args = {} - for k, vs in args.items(): - if isinstance(vs, str): - vs = [vs] - encoded_args[k] = [v.encode("utf8") for v in vs] - - query_str = urllib.parse.urlencode(encoded_args, True) - - return query_str.encode("utf8") |