summary refs log tree commit diff
path: root/synapse/http/matrixfederationclient.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-08-26 12:22:25 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2020-08-26 12:22:25 +0100
commit7affcd01c76f495dfe70dbb9f68d964a2d58b9bd (patch)
tree7a42640f7b1c7bd068332a4fd9dce3c2a0dcecd6 /synapse/http/matrixfederationclient.py
parentSimplify medium and address assignment (diff)
parentAdd functions to `MultiWriterIdGen` used by events stream (#8164) (diff)
downloadsynapse-github/anoa/user_param_ui_auth.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into anoa/user_param_ui_auth github/anoa/user_param_ui_auth anoa/user_param_ui_auth
* 'develop' of github.com:matrix-org/synapse: (369 commits)
  Add functions to `MultiWriterIdGen` used by events stream (#8164)
  Do not allow send_nonmember_event to be called with shadow-banned users. (#8158)
  Changelog fixes
  1.19.1rc1
  Make StreamIdGen `get_next` and `get_next_mult` async  (#8161)
  Wording fixes to 'name' user admin api filter (#8163)
  Fix missing double-backtick in RST document
  Search in columns 'name' and 'displayname' in the admin users endpoint (#7377)
  Add type hints for state. (#8140)
  Stop shadow-banned users from sending non-member events. (#8142)
  Allow capping a room's retention policy (#8104)
  Add healthcheck for default localhost 8008 port on /health endpoint. (#8147)
  Fix flaky shadow-ban tests. (#8152)
  Fix join ratelimiter breaking profile updates and idempotency (#8153)
  Do not apply ratelimiting on joins to appservices (#8139)
  Don't fail /submit_token requests on incorrect session ID if request_token_inhibit_3pid_errors is turned on (#7991)
  Do not apply ratelimiting on joins to appservices (#8139)
  Micro-optimisations to get_auth_chain_ids (#8132)
  Allow denying or shadow banning registrations via the spam checker (#8034)
  Stop shadow-banned users from sending invites. (#8095)
  ...
Diffstat (limited to 'synapse/http/matrixfederationclient.py')
-rw-r--r--synapse/http/matrixfederationclient.py204
1 files changed, 127 insertions, 77 deletions
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py

index 2d47b9ea00..738be43f46 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py
@@ -17,11 +17,9 @@ import cgi import logging import random import sys +import urllib from io import BytesIO -from six import raise_from, string_types -from six.moves import urllib - import attr import treq from canonicaljson import encode_canonical_json @@ -31,10 +29,11 @@ from zope.interface import implementer from twisted.internet import defer, protocol from twisted.internet.error import DNSLookupError -from twisted.internet.interfaces import IReactorPluggableNameResolver +from twisted.internet.interfaces import IReactorPluggableNameResolver, IReactorTime from twisted.internet.task import _EPSILON, Cooperator from twisted.web._newclient import ResponseDone from twisted.web.http_headers import Headers +from twisted.web.iweb import IResponse import synapse.metrics import synapse.util.retryutils @@ -76,7 +75,7 @@ MAXINT = sys.maxsize _next_id = 1 -@attr.s +@attr.s(frozen=True) class MatrixFederationRequest(object): method = attr.ib() """HTTP method @@ -112,27 +111,52 @@ class MatrixFederationRequest(object): :type: str|None """ + uri = attr.ib(init=False, type=bytes) + """The URI of this request + """ + def __attrs_post_init__(self): global _next_id - self.txn_id = "%s-O-%s" % (self.method, _next_id) + txn_id = "%s-O-%s" % (self.method, _next_id) _next_id = (_next_id + 1) % (MAXINT - 1) + object.__setattr__(self, "txn_id", txn_id) + + destination_bytes = self.destination.encode("ascii") + path_bytes = self.path.encode("ascii") + if self.query: + query_bytes = encode_query_args(self.query) + else: + query_bytes = b"" + + # The object is frozen so we can pre-compute this. + uri = urllib.parse.urlunparse( + (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") + ) + object.__setattr__(self, "uri", uri) + def get_json(self): if self.json_callback: return self.json_callback() return self.json -@defer.inlineCallbacks -def _handle_json_response(reactor, timeout_sec, request, response): +async def _handle_json_response( + reactor: IReactorTime, + timeout_sec: float, + request: MatrixFederationRequest, + response: IResponse, + start_ms: int, +): """ Reads the JSON body of a response, with a timeout Args: - reactor (IReactor): twisted reactor, for the timeout - timeout_sec (float): number of seconds to wait for response to complete - request (MatrixFederationRequest): the request that triggered the response - response (IResponse): response to the request + reactor: twisted reactor, for the timeout + timeout_sec: number of seconds to wait for response to complete + request: the request that triggered the response + response: response to the request + start_ms: Timestamp when request was made Returns: dict: parsed JSON response @@ -143,26 +167,38 @@ def _handle_json_response(reactor, timeout_sec, request, response): d = treq.json_content(response) d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor) - body = yield make_deferred_yieldable(d) + body = await make_deferred_yieldable(d) except TimeoutError as e: logger.warning( - "{%s} [%s] Timed out reading response", request.txn_id, request.destination, + "{%s} [%s] Timed out reading response - %s %s", + request.txn_id, + request.destination, + request.method, + request.uri.decode("ascii"), ) raise RequestSendFailed(e, can_retry=True) from e except Exception as e: logger.warning( - "{%s} [%s] Error reading response: %s", + "{%s} [%s] Error reading response %s %s: %s", request.txn_id, request.destination, + request.method, + request.uri.decode("ascii"), e, ) raise + + time_taken_secs = reactor.seconds() - start_ms / 1000 + logger.info( - "{%s} [%s] Completed: %d %s", + "{%s} [%s] Completed request: %d %s in %.2f secs - %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), + time_taken_secs, + request.method, + request.uri.decode("ascii"), ) return body @@ -178,7 +214,7 @@ class MatrixFederationHttpClient(object): def __init__(self, hs, tls_client_options_factory): self.hs = hs - self.signing_key = hs.config.signing_key[0] + self.signing_key = hs.signing_key self.server_name = hs.hostname real_reactor = hs.get_reactor() @@ -199,7 +235,14 @@ class MatrixFederationHttpClient(object): self.reactor = Reactor() - self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory) + user_agent = hs.version_string + if hs.config.user_agent_suffix: + user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix) + user_agent = user_agent.encode("ascii") + + self.agent = MatrixFederationAgent( + self.reactor, tls_client_options_factory, user_agent + ) # Use a BlacklistingAgentWrapper to prevent circumventing the IP # blacklist via IP literals in server names @@ -219,8 +262,7 @@ class MatrixFederationHttpClient(object): self._cooperator = Cooperator(scheduler=schedule) - @defer.inlineCallbacks - def _send_request_with_optional_trailing_slash( + async def _send_request_with_optional_trailing_slash( self, request, try_trailing_slash_on_400=False, **send_request_args ): """Wrapper for _send_request which can optionally retry the request @@ -241,10 +283,10 @@ class MatrixFederationHttpClient(object): (except 429). Returns: - Deferred[Dict]: Parsed JSON response body. + Dict: Parsed JSON response body. """ try: - response = yield self._send_request(request, **send_request_args) + response = await self._send_request(request, **send_request_args) except HttpResponseException as e: # Received an HTTP error > 300. Check if it meets the requirements # to retry with a trailing slash @@ -258,14 +300,15 @@ class MatrixFederationHttpClient(object): # 'M_UNRECOGNIZED' which some endpoints can return when omitting a # trailing slash on Synapse <= v0.99.3. logger.info("Retrying request with trailing slash") - request.path += "/" - response = yield self._send_request(request, **send_request_args) + # Request is frozen so we create a new instance + request = attr.evolve(request, path=request.path + "/") + + response = await self._send_request(request, **send_request_args) return response - @defer.inlineCallbacks - def _send_request( + async def _send_request( self, request, retry_on_dns_fail=True, @@ -306,7 +349,7 @@ class MatrixFederationHttpClient(object): backoff_on_404 (bool): Back off if we get a 404 Returns: - Deferred[twisted.web.client.Response]: resolves with the HTTP + twisted.web.client.Response: resolves with the HTTP response object on success. Raises: @@ -330,7 +373,7 @@ class MatrixFederationHttpClient(object): ): raise FederationDeniedError(request.destination) - limiter = yield synapse.util.retryutils.get_retry_limiter( + limiter = await synapse.util.retryutils.get_retry_limiter( request.destination, self.clock, self._store, @@ -371,9 +414,7 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url_bytes = urllib.parse.urlunparse( - (b"matrix", destination_bytes, path_bytes, None, query_bytes, b"") - ) + url_bytes = request.uri url_str = url_bytes.decode("ascii") url_to_sign_bytes = urllib.parse.urlunparse( @@ -400,7 +441,7 @@ class MatrixFederationHttpClient(object): headers_dict[b"Authorization"] = auth_headers - logger.info( + logger.debug( "{%s} [%s] Sending request: %s %s; timeout %fs", request.txn_id, request.destination, @@ -428,20 +469,20 @@ class MatrixFederationHttpClient(object): reactor=self.reactor, ) - response = yield request_deferred + response = await request_deferred except TimeoutError as e: raise RequestSendFailed(e, can_retry=True) from e except DNSLookupError as e: - raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e) + raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e except Exception as e: - logger.info("Failed to send request: %s", e) - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e incoming_responses_counter.labels( request.method, response.code ).inc() set_tag(tags.HTTP_STATUS_CODE, response.code) + response_phrase = response.phrase.decode("ascii", errors="replace") if 200 <= response.code < 300: logger.debug( @@ -449,7 +490,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode("ascii", errors="replace"), + response_phrase, ) pass else: @@ -458,7 +499,7 @@ class MatrixFederationHttpClient(object): request.txn_id, request.destination, response.code, - response.phrase.decode("ascii", errors="replace"), + response_phrase, ) # :'( # Update transactions table? @@ -468,7 +509,7 @@ class MatrixFederationHttpClient(object): ) try: - body = yield make_deferred_yieldable(d) + body = await make_deferred_yieldable(d) except Exception as e: # Eh, we're already going to raise an exception so lets # ignore if this fails. @@ -482,18 +523,18 @@ class MatrixFederationHttpClient(object): ) body = None - e = HttpResponseException(response.code, response.phrase, body) + e = HttpResponseException(response.code, response_phrase, body) # Retry if the error is a 429 (Too Many Requests), # otherwise just raise a standard HttpResponseException if response.code == 429: - raise_from(RequestSendFailed(e, can_retry=True), e) + raise RequestSendFailed(e, can_retry=True) from e else: raise e break except RequestSendFailed as e: - logger.warning( + logger.info( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, request.destination, @@ -522,7 +563,7 @@ class MatrixFederationHttpClient(object): delay, ) - yield self.clock.sleep(delay) + await self.clock.sleep(delay) retries_left -= 1 else: raise @@ -557,13 +598,17 @@ class MatrixFederationHttpClient(object): Returns: list[bytes]: a list of headers to be added as "Authorization:" headers """ - request = {"method": method, "uri": url_bytes, "origin": self.server_name} + request = { + "method": method.decode("ascii"), + "uri": url_bytes.decode("ascii"), + "origin": self.server_name, + } if destination is not None: - request["destination"] = destination + request["destination"] = destination.decode("ascii") if destination_is is not None: - request["destination_is"] = destination_is + request["destination_is"] = destination_is.decode("ascii") if content is not None: request["content"] = content @@ -581,8 +626,7 @@ class MatrixFederationHttpClient(object): ) return auth_headers - @defer.inlineCallbacks - def put_json( + async def put_json( self, destination, path, @@ -626,7 +670,7 @@ class MatrixFederationHttpClient(object): enabled. Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -648,7 +692,9 @@ class MatrixFederationHttpClient(object): json=data, ) - response = yield self._send_request_with_optional_trailing_slash( + start_ms = self.clock.time_msec() + + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, backoff_on_404=backoff_on_404, @@ -657,14 +703,13 @@ class MatrixFederationHttpClient(object): timeout=timeout, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + body = await _handle_json_response( + self.reactor, self.default_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def post_json( + async def post_json( self, destination, path, @@ -697,7 +742,7 @@ class MatrixFederationHttpClient(object): args (dict): query params Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -715,7 +760,9 @@ class MatrixFederationHttpClient(object): method="POST", destination=destination, path=path, query=args, json=data ) - response = yield self._send_request( + start_ms = self.clock.time_msec() + + response = await self._send_request( request, long_retries=long_retries, timeout=timeout, @@ -727,13 +774,12 @@ class MatrixFederationHttpClient(object): else: _sec_timeout = self.default_timeout - body = yield _handle_json_response( - self.reactor, _sec_timeout, request, response + body = await _handle_json_response( + self.reactor, _sec_timeout, request, response, start_ms, ) return body - @defer.inlineCallbacks - def get_json( + async def get_json( self, destination, path, @@ -765,7 +811,7 @@ class MatrixFederationHttpClient(object): response we should try appending a trailing slash to the end of the request. Workaround for #3622 in Synapse <= v0.99.3. Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -782,7 +828,9 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) - response = yield self._send_request_with_optional_trailing_slash( + start_ms = self.clock.time_msec() + + response = await self._send_request_with_optional_trailing_slash( request, try_trailing_slash_on_400, backoff_on_404=False, @@ -791,14 +839,13 @@ class MatrixFederationHttpClient(object): timeout=timeout, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + body = await _handle_json_response( + self.reactor, self.default_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def delete_json( + async def delete_json( self, destination, path, @@ -826,7 +873,7 @@ class MatrixFederationHttpClient(object): args (dict): query params Returns: - Deferred[dict|list]: Succeeds when we get a 2xx HTTP response. The + dict|list: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. Raises: @@ -843,20 +890,21 @@ class MatrixFederationHttpClient(object): method="DELETE", destination=destination, path=path, query=args ) - response = yield self._send_request( + start_ms = self.clock.time_msec() + + response = await self._send_request( request, long_retries=long_retries, timeout=timeout, ignore_backoff=ignore_backoff, ) - body = yield _handle_json_response( - self.reactor, self.default_timeout, request, response + body = await _handle_json_response( + self.reactor, self.default_timeout, request, response, start_ms ) return body - @defer.inlineCallbacks - def get_file( + async def get_file( self, destination, path, @@ -876,7 +924,7 @@ class MatrixFederationHttpClient(object): and try the request anyway. Returns: - Deferred[tuple[int, dict]]: Resolves with an (int,dict) tuple of + tuple[int, dict]: Resolves with an (int,dict) tuple of the file length and a dict of the response headers. Raises: @@ -893,7 +941,7 @@ class MatrixFederationHttpClient(object): method="GET", destination=destination, path=path, query=args ) - response = yield self._send_request( + response = await self._send_request( request, retry_on_dns_fail=retry_on_dns_fail, ignore_backoff=ignore_backoff ) @@ -902,7 +950,7 @@ class MatrixFederationHttpClient(object): try: d = _readBodyToFile(response, output_stream, max_size) d.addTimeout(self.default_timeout, self.reactor) - length = yield make_deferred_yieldable(d) + length = await make_deferred_yieldable(d) except Exception as e: logger.warning( "{%s} [%s] Error reading response: %s", @@ -912,12 +960,14 @@ class MatrixFederationHttpClient(object): ) raise logger.info( - "{%s} [%s] Completed: %d %s [%d bytes]", + "{%s} [%s] Completed: %d %s [%d bytes] %s %s", request.txn_id, request.destination, response.code, response.phrase.decode("ascii", errors="replace"), length, + request.method, + request.uri.decode("ascii"), ) return (length, headers) @@ -998,7 +1048,7 @@ def encode_query_args(args): encoded_args = {} for k, vs in args.items(): - if isinstance(vs, string_types): + if isinstance(vs, str): vs = [vs] encoded_args[k] = [v.encode("UTF-8") for v in vs]