From 219606a6eda4aac01bdb24f69ec0d418689e653e Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Wed, 26 Sep 2018 13:26:27 -0600 Subject: Fix exception documentation in matrixfederationclient.py --- synapse/http/matrixfederationclient.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 14b12cd1c4..1a79f6305b 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -230,7 +230,7 @@ class MatrixFederationHttpClient(object): Returns: Deferred: resolves with the http response object on success. - Fails with ``HTTPRequestException``: if we get an HTTP response + Fails with ``HttpResponseException``: if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -478,7 +478,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -532,7 +532,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -587,7 +587,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -638,7 +638,7 @@ class MatrixFederationHttpClient(object): Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. - Fails with ``HTTPRequestException`` if we get an HTTP response + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300. Fails with ``NotRetryingDestination`` if we are not yet ready @@ -682,7 +682,7 @@ class MatrixFederationHttpClient(object): Deferred: resolves with an (int,dict) tuple of the file length and a dict of the response headers. - Fails with ``HTTPRequestException`` if we get an HTTP response code + Fails with ``HttpResponseException`` if we get an HTTP response code >= 300 Fails with ``NotRetryingDestination`` if we are not yet ready -- cgit 1.5.1 From b8a5b0097c4fc88a43c5c5f9785720d38299d3d8 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 16 Oct 2018 10:44:49 +0100 Subject: Various cleanups in the federation client code (#4031) - Improve logging: log things in the right order, include destination and txids in all log lines, don't log successful responses twice - Fix the docstring on TransportLayerClient.send_transaction - Don't use treq.request, which is overcomplicated for our purposes: just use a twisted.web.client.Agent. - simplify the logic for setting up the bodyProducer - fix bytes/str confusions --- changelog.d/4031.misc | 1 + synapse/federation/transaction_queue.py | 27 +++++------- synapse/federation/transport/client.py | 19 ++++---- synapse/http/matrixfederationclient.py | 78 +++++++++++++++++---------------- 4 files changed, 64 insertions(+), 61 deletions(-) create mode 100644 changelog.d/4031.misc (limited to 'synapse/http') diff --git a/changelog.d/4031.misc b/changelog.d/4031.misc new file mode 100644 index 0000000000..60be8b59fd --- /dev/null +++ b/changelog.d/4031.misc @@ -0,0 +1 @@ +Various cleanups in the federation client code diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 98b5950800..3fdd63be95 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -633,14 +633,6 @@ class TransactionQueue(object): transaction, json_data_cb ) code = 200 - - if response: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: - logger.warn( - "Transaction returned error for %s: %s", - e_id, r, - ) except HttpResponseException as e: code = e.code response = e.response @@ -657,19 +649,24 @@ class TransactionQueue(object): destination, txn_id, code ) - logger.debug("TX [%s] Sent transaction", destination) - logger.debug("TX [%s] Marking as delivered...", destination) - yield self.transaction_actions.delivered( transaction, code, response ) - logger.debug("TX [%s] Marked as delivered", destination) + logger.debug("TX [%s] {%s} Marked as delivered", destination, txn_id) - if code != 200: + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, txn_id, e_id, r, + ) + else: for p in pdus: - logger.info( - "Failed to send event %s to %s", p.event_id, destination + logger.warn( + "TX [%s] {%s} Failed to send event %s", + destination, txn_id, p.event_id, ) success = False diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 2ab973d6c8..edba5a9808 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -143,9 +143,17 @@ class TransportLayerClient(object): transaction (Transaction) Returns: - Deferred: Results of the deferred is a tuple in the form of - (response_code, response_body) where the response_body is a - python dict decoded from json + Deferred: Succeeds when we get a 2xx HTTP response. The result + will be the decoded JSON body. + + Fails with ``HTTPRequestException`` if we get an HTTP response + code >= 300. + + Fails with ``NotRetryingDestination`` if we are not yet ready + to retry this server. + + Fails with ``FederationDeniedError`` if this destination + is not on our federation whitelist """ logger.debug( "send_data dest=%s, txid=%s", @@ -170,11 +178,6 @@ class TransportLayerClient(object): backoff_on_404=True, # If we get a 404 the other side has gone ) - logger.debug( - "send_data dest=%s, txid=%s, got response: 200", - transaction.destination, transaction.transaction_id, - ) - defer.returnValue(response) @defer.inlineCallbacks diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 14b12cd1c4..fcc02fc77d 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -195,7 +195,7 @@ class MatrixFederationHttpClient(object): ) self.clock = hs.get_clock() self._store = hs.get_datastore() - self.version_string = hs.version_string.encode('ascii') + self.version_string_bytes = hs.version_string.encode('ascii') self.default_timeout = 60 def schedule(x): @@ -261,8 +261,8 @@ class MatrixFederationHttpClient(object): ignore_backoff=ignore_backoff, ) - method = request.method - destination = request.destination + method_bytes = request.method.encode("ascii") + destination_bytes = request.destination.encode("ascii") path_bytes = request.path.encode("ascii") if request.query: query_bytes = encode_query_args(request.query) @@ -270,8 +270,8 @@ class MatrixFederationHttpClient(object): query_bytes = b"" headers_dict = { - "User-Agent": [self.version_string], - "Host": [request.destination], + b"User-Agent": [self.version_string_bytes], + b"Host": [destination_bytes], } with limiter: @@ -282,50 +282,51 @@ class MatrixFederationHttpClient(object): else: retries_left = MAX_SHORT_RETRIES - url = urllib.parse.urlunparse(( - b"matrix", destination.encode("ascii"), + url_bytes = urllib.parse.urlunparse(( + b"matrix", destination_bytes, path_bytes, None, query_bytes, b"", - )).decode('ascii') + )) + url_str = url_bytes.decode('ascii') - http_url = urllib.parse.urlunparse(( + url_to_sign_bytes = urllib.parse.urlunparse(( b"", b"", path_bytes, None, query_bytes, b"", - )).decode('ascii') + )) while True: try: json = request.get_json() if json: - data = encode_canonical_json(json) - headers_dict["Content-Type"] = ["application/json"] + headers_dict[b"Content-Type"] = [b"application/json"] self.sign_request( - destination, method, http_url, headers_dict, json + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, json, ) - else: - data = None - self.sign_request(destination, method, http_url, headers_dict) - - logger.info( - "{%s} [%s] Sending request: %s %s", - request.txn_id, destination, method, url - ) - - if data: + data = encode_canonical_json(json) producer = FileBodyProducer( BytesIO(data), - cooperator=self._cooperator + cooperator=self._cooperator, ) else: producer = None + self.sign_request( + destination_bytes, method_bytes, url_to_sign_bytes, + headers_dict, + ) - request_deferred = treq.request( - method, - url, + logger.info( + "{%s} [%s] Sending request: %s %s", + request.txn_id, request.destination, request.method, + url_str, + ) + + # we don't want all the fancy cookie and redirect handling that + # treq.request gives: just use the raw Agent. + request_deferred = self.agent.request( + method_bytes, + url_bytes, headers=Headers(headers_dict), - data=producer, - agent=self.agent, - reactor=self.hs.get_reactor(), - unbuffered=True + bodyProducer=producer, ) request_deferred = timeout_deferred( @@ -344,9 +345,9 @@ class MatrixFederationHttpClient(object): logger.warn( "{%s} [%s] Request failed: %s %s: %s", request.txn_id, - destination, - method, - url, + request.destination, + request.method, + url_str, _flatten_response_never_received(e), ) @@ -366,7 +367,7 @@ class MatrixFederationHttpClient(object): logger.debug( "{%s} [%s] Waiting %ss before re-sending...", request.txn_id, - destination, + request.destination, delay, ) @@ -378,7 +379,7 @@ class MatrixFederationHttpClient(object): logger.info( "{%s} [%s] Got response headers: %d %s", request.txn_id, - destination, + request.destination, response.code, response.phrase.decode('ascii', errors='replace'), ) @@ -411,8 +412,9 @@ class MatrixFederationHttpClient(object): destination_is must be non-None. method (bytes): The HTTP method of the request url_bytes (bytes): The URI path of the request - headers_dict (dict): Dictionary of request headers to append to - content (bytes): The body of the request + headers_dict (dict[bytes, list[bytes]]): Dictionary of request headers to + append to + content (object): The body of the request destination_is (bytes): As 'destination', but if the destination is an identity server -- cgit 1.5.1 From b69216f7680b92ccd8f92e618e1a81c9ba3c3346 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 19 Oct 2018 21:45:45 +1100 Subject: Make the metrics less racy (#4061) --- changelog.d/4061.bugfix | 1 + synapse/http/request_metrics.py | 31 ++++++++++++++++++------------- synapse/notifier.py | 6 +++--- 3 files changed, 22 insertions(+), 16 deletions(-) create mode 100644 changelog.d/4061.bugfix (limited to 'synapse/http') diff --git a/changelog.d/4061.bugfix b/changelog.d/4061.bugfix new file mode 100644 index 0000000000..94ffcf7a51 --- /dev/null +++ b/changelog.d/4061.bugfix @@ -0,0 +1 @@ +Fix some metrics being racy and causing exceptions when polled by Prometheus. diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index fedb4e6b18..62045a918b 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -39,7 +39,8 @@ outgoing_responses_counter = Counter( ) response_timer = Histogram( - "synapse_http_server_response_time_seconds", "sec", + "synapse_http_server_response_time_seconds", + "sec", ["method", "servlet", "tag", "code"], ) @@ -79,15 +80,11 @@ response_size = Counter( # than when the response was written. in_flight_requests_ru_utime = Counter( - "synapse_http_server_in_flight_requests_ru_utime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_utime_seconds", "", ["method", "servlet"] ) in_flight_requests_ru_stime = Counter( - "synapse_http_server_in_flight_requests_ru_stime_seconds", - "", - ["method", "servlet"], + "synapse_http_server_in_flight_requests_ru_stime_seconds", "", ["method", "servlet"] ) in_flight_requests_db_txn_count = Counter( @@ -134,7 +131,7 @@ def _get_in_flight_counts(): # type counts = {} for rm in reqs: - key = (rm.method, rm.name,) + key = (rm.method, rm.name) counts[key] = counts.get(key, 0) + 1 return counts @@ -175,7 +172,8 @@ class RequestMetrics(object): if context != self.start_context: logger.warn( "Context have unexpectedly changed %r, %r", - context, self.start_context + context, + self.start_context, ) return @@ -192,10 +190,10 @@ class RequestMetrics(object): resource_usage = context.get_resource_usage() response_ru_utime.labels(self.method, self.name, tag).inc( - resource_usage.ru_utime, + resource_usage.ru_utime ) response_ru_stime.labels(self.method, self.name, tag).inc( - resource_usage.ru_stime, + resource_usage.ru_stime ) response_db_txn_count.labels(self.method, self.name, tag).inc( resource_usage.db_txn_count @@ -222,8 +220,15 @@ class RequestMetrics(object): diff = new_stats - self._request_stats self._request_stats = new_stats - in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime) - in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime) + # max() is used since rapid use of ru_stime/ru_utime can end up with the + # count going backwards due to NTP, time smearing, fine-grained + # correction, or floating points. Who knows, really? + in_flight_requests_ru_utime.labels(self.method, self.name).inc( + max(diff.ru_utime, 0) + ) + in_flight_requests_ru_stime.labels(self.method, self.name).inc( + max(diff.ru_stime, 0) + ) in_flight_requests_db_txn_count.labels(self.method, self.name).inc( diff.db_txn_count diff --git a/synapse/notifier.py b/synapse/notifier.py index 340b16ce25..de02b1017e 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -186,9 +186,9 @@ class Notifier(object): def count_listeners(): all_user_streams = set() - for x in self.room_to_user_streams.values(): + for x in list(self.room_to_user_streams.values()): all_user_streams |= x - for x in self.user_to_user_stream.values(): + for x in list(self.user_to_user_stream.values()): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) @@ -196,7 +196,7 @@ class Notifier(object): LaterGauge( "synapse_notifier_rooms", "", [], - lambda: count(bool, self.room_to_user_streams.values()), + lambda: count(bool, list(self.room_to_user_streams.values())), ) LaterGauge( "synapse_notifier_users", "", [], -- cgit 1.5.1