From 83f119a84a592ccf0d2dc06e699bcf11eb4380b0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 13:05:11 +0100 Subject: Log requests and responses sent via http.client --- synapse/http/client.py | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index e746f2416e..9091ae2d38 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,21 +61,31 @@ class SimpleHttpClient(object): self.agent = Agent(reactor, pool=pool) self.version_string = hs.version_string - def request(self, method, *args, **kwargs): + def request(self, method, uri, *args, **kwargs): # A small wrapper around self.agent.request() so we can easily attach # counters to it outgoing_requests_counter.inc(method) d = preserve_context_over_fn( self.agent.request, - method, *args, **kwargs + method, uri, *args, **kwargs ) + logger.info("Sending request %s %s", method, uri) + def _cb(response): incoming_responses_counter.inc(method, response.code) + logger.info( + "Received response to %s %s: %s", + method, uri, response.code + ) return response def _eb(failure): incoming_responses_counter.inc(method, "ERR") + logger.info( + "Error sending request to %s %s: %s %s", + method, uri, failure.type, failure.getErrorMessage() + ) return failure d.addCallbacks(_cb, _eb) @@ -84,7 +94,9 @@ class SimpleHttpClient(object): @defer.inlineCallbacks def post_urlencoded_get_json(self, uri, args={}): + # TODO: Do we ever want to log message contents? logger.debug("post_urlencoded_get_json args: %s", args) + query_bytes = urllib.urlencode(args, True) response = yield self.request( @@ -105,7 +117,7 @@ class SimpleHttpClient(object): def post_json_get_json(self, uri, post_json): json_str = encode_canonical_json(post_json) - logger.info("HTTP POST %s -> %s", json_str, uri) + logger.debug("HTTP POST %s -> %s", json_str, uri) response = yield self.request( "POST", -- cgit 1.4.1 From f00f8346f143dc306e184b6d479294ab11a4ff55 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 13:37:58 +0100 Subject: Make http.server request logging more verbose, but redact access_tokens --- synapse/http/server.py | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/server.py b/synapse/http/server.py index ae8f3b3972..e6e8a59f6c 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -32,6 +32,7 @@ from twisted.web.util import redirectTo import collections import logging +import re import urllib logger = logging.getLogger(__name__) @@ -82,9 +83,18 @@ def request_handler(request_handler): code = None start = self.clock.time_msec() try: + request_uri = request.uri + + # Don't log access_tokens + request_uri = re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1\2', + request_uri + ) + logger.info( - "Received request: %s %s", - request.method, request.path + "%s - Received request: %s %s", + request.getClientIP(), request.method, request_uri ) d = request_handler(self, request) with PreserveLoggingContext(): -- cgit 1.4.1 From b5209c57441d9e7bace28a03774d2605a6572514 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 16:36:49 +0100 Subject: Create SynapseRequest that overrides __repr__ to not print access_token --- synapse/app/homeserver.py | 49 +++++++++++++++++++++++++++++++++++++++++------ synapse/http/server.py | 14 +++----------- 2 files changed, 46 insertions(+), 17 deletions(-) (limited to 'synapse/http') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 95e9122d3e..7c1ad6bc13 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -63,6 +63,7 @@ import synapse import logging import os +import re import resource import subprocess @@ -433,9 +434,34 @@ class SynapseService(service.Service): return self._port.stopListening() -class XForwardedForRequest(Request): - def __init__(self, *args, **kw): +class SynapseRequest(Request): + def __init__(self, site_tag, *args, **kw): Request.__init__(self, *args, **kw) + self.site_tag = site_tag + self.authenticated_entity = None + + def __repr__(self): + # We overwrite this so that we don't log ``access_token`` + return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % ( + self.__class__.__name__, + id(self), + self.method, + self.get_redacted_uri(), + self.clientproto, + self.site_tag, + ) + + def get_redacted_uri(self): + return re.sub( + r'(\?.*access_token=)[^&]*(.*)$', + r'\1\2', + self.uri + ) + + +class XForwardedForRequest(SynapseRequest): + def __init__(self, *args, **kw): + SynapseRequest.__init__(self, *args, **kw) """ Add a layer on top of another request that only uses the value of an @@ -451,8 +477,16 @@ class XForwardedForRequest(Request): b"x-forwarded-for", [b"-"])[0].split(b",")[0].strip() -def XForwardedFactory(*args, **kwargs): - return XForwardedForRequest(*args, **kwargs) +class SynapseRequestFactory(object): + def __init__(self, site_tag, x_forwarded_for): + self.site_tag = site_tag + self.x_forwarded_for = x_forwarded_for + + def __call__(self, *args, **kwargs): + if self.x_forwarded_for: + return XForwardedForRequest(self.site_tag, *args, **kwargs) + else: + return SynapseRequest(self.site_tag, *args, **kwargs) class SynapseSite(Site): @@ -462,8 +496,11 @@ class SynapseSite(Site): """ def __init__(self, logger_name, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - if config.get("x_forwarded", False): - self.requestFactory = XForwardedFactory + + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(None, proxied) + + if proxied: self._log_formatter = proxiedLogFormatter else: self._log_formatter = combinedLogFormatter diff --git a/synapse/http/server.py b/synapse/http/server.py index e6e8a59f6c..7f8b9dbb29 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -32,7 +32,6 @@ from twisted.web.util import redirectTo import collections import logging -import re import urllib logger = logging.getLogger(__name__) @@ -83,18 +82,11 @@ def request_handler(request_handler): code = None start = self.clock.time_msec() try: - request_uri = request.uri - - # Don't log access_tokens - request_uri = re.sub( - r'(\?.*access_token=)[^&]*(.*)$', - r'\1\2', - request_uri - ) - logger.info( "%s - Received request: %s %s", - request.getClientIP(), request.method, request_uri + request.getClientIP(), + request.method, + request.get_redacted_uri() ) d = request_handler(self, request) with PreserveLoggingContext(): -- cgit 1.4.1 From cee69441d3d3b4d966b6ec69c7dbf4eb3b876bb3 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 17:11:44 +0100 Subject: Log more when we have processed the request --- synapse/api/auth.py | 2 ++ synapse/app/homeserver.py | 10 ++++++++-- synapse/federation/transport/server.py | 1 + synapse/http/server.py | 14 ++++++++++++-- 4 files changed, 23 insertions(+), 4 deletions(-) (limited to 'synapse/http') diff --git a/synapse/api/auth.py b/synapse/api/auth.py index d5bf0be85c..4da62e5d8d 100644 --- a/synapse/api/auth.py +++ b/synapse/api/auth.py @@ -370,6 +370,8 @@ class Auth(object): user_agent=user_agent ) + request.authenticated_entity = user.to_string() + defer.returnValue((user, ClientInfo(device_id, token_id))) except KeyError: raise AuthError( diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 7c1ad6bc13..fca6f06e3b 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -143,6 +143,7 @@ class SynapseHomeServer(HomeServer): port = listener_config["port"] bind_address = listener_config.get("bind_address", "") tls = listener_config.get("tls", False) + site_tag = listener_config.get("tag", port) if tls and config.no_tls: return @@ -199,6 +200,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", + site_tag, listener_config, root_resource, ), @@ -210,6 +212,7 @@ class SynapseHomeServer(HomeServer): port, SynapseSite( "synapse.access.https", + site_tag, listener_config, root_resource, ), @@ -458,6 +461,9 @@ class SynapseRequest(Request): self.uri ) + def get_user_agent(self): + return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): @@ -494,11 +500,11 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, config, resource, *args, **kwargs): + def __init__(self, logger_name, tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(None, proxied) + self.requestFactory = SynapseRequestFactory(tag, proxied) if proxied: self._log_formatter = proxiedLogFormatter diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 31190e700a..bad93c6b2f 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -94,6 +94,7 @@ class TransportLayerServer(object): yield self.keyring.verify_json_for_server(origin, json_request) logger.info("Request from %s", origin) + request.authenticated_entity = origin defer.returnValue((origin, content)) diff --git a/synapse/http/server.py b/synapse/http/server.py index 7f8b9dbb29..34645a371a 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -125,8 +125,18 @@ def request_handler(request_handler): code = str(code) if code else "-" end = self.clock.time_msec() logger.info( - "Processed request: %dms %s %s %s", - end-start, code, request.method, request.path + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + request.getClientIP(), + request.site_tag, + request.authenticated_entity, + end-start, + request.sentLength, + code, + request.method, + request.get_redacted_uri(), + request.clientproto, + request.get_user_agent(), ) return wrapped_request_handler -- cgit 1.4.1 From aaa749d366f768dd164f899c1d8e5eedd44ee5f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 15 Jun 2015 18:18:05 +0100 Subject: Disable twisted access logging. Move access logging to SynapseRequest object --- synapse/app/homeserver.py | 64 ++++++++++++++++++++++++--------- synapse/http/server.py | 90 +++++++++++++++++------------------------------ 2 files changed, 79 insertions(+), 75 deletions(-) (limited to 'synapse/http') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index fca6f06e3b..7effedf7dc 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -35,7 +35,6 @@ from twisted.enterprise import adbapi from twisted.web.resource import Resource, EncodingResourceWrapper from twisted.web.static import File from twisted.web.server import Site, GzipEncoderFactory, Request -from twisted.web.http import proxiedLogFormatter, combinedLogFormatter from synapse.http.server import JsonResource, RootRedirect from synapse.rest.media.v0.content_repository import ContentRepoResource from synapse.rest.media.v1.media_repository import MediaRepositoryResource @@ -61,11 +60,13 @@ import twisted.manhole.telnet import synapse +import contextlib import logging import os import re import resource import subprocess +import time logger = logging.getLogger("synapse.app.homeserver") @@ -438,10 +439,11 @@ class SynapseService(service.Service): class SynapseRequest(Request): - def __init__(self, site_tag, *args, **kw): + def __init__(self, site, *args, **kw): Request.__init__(self, *args, **kw) - self.site_tag = site_tag + self.site = site self.authenticated_entity = None + self.start_time = 0 def __repr__(self): # We overwrite this so that we don't log ``access_token`` @@ -451,7 +453,7 @@ class SynapseRequest(Request): self.method, self.get_redacted_uri(), self.clientproto, - self.site_tag, + self.site.site_tag, ) def get_redacted_uri(self): @@ -464,6 +466,38 @@ class SynapseRequest(Request): def get_user_agent(self): return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1] + def started_processing(self): + self.site.access_logger.info( + "%s - %s - Received request: %s %s", + self.getClientIP(), + self.site.site_tag, + self.method, + self.get_redacted_uri() + ) + self.start_time = int(time.time() * 1000) + + def finished_processing(self): + self.site.access_logger.info( + "%s - %s - {%s}" + " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", + self.getClientIP(), + self.site.site_tag, + self.authenticated_entity, + int(time.time() * 1000) - self.start_time, + self.sentLength, + self.code, + self.method, + self.get_redacted_uri(), + self.clientproto, + self.get_user_agent(), + ) + + @contextlib.contextmanager + def processing(self): + self.started_processing() + yield + self.finished_processing() + class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): @@ -484,15 +518,15 @@ class XForwardedForRequest(SynapseRequest): class SynapseRequestFactory(object): - def __init__(self, site_tag, x_forwarded_for): - self.site_tag = site_tag + def __init__(self, site, x_forwarded_for): + self.site = site self.x_forwarded_for = x_forwarded_for def __call__(self, *args, **kwargs): if self.x_forwarded_for: - return XForwardedForRequest(self.site_tag, *args, **kwargs) + return XForwardedForRequest(self.site, *args, **kwargs) else: - return SynapseRequest(self.site_tag, *args, **kwargs) + return SynapseRequest(self.site, *args, **kwargs) class SynapseSite(Site): @@ -500,21 +534,17 @@ class SynapseSite(Site): Subclass of a twisted http Site that does access logging with python's standard logging """ - def __init__(self, logger_name, tag, config, resource, *args, **kwargs): + def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs): Site.__init__(self, resource, *args, **kwargs) - proxied = config.get("x_forwarded", False) - self.requestFactory = SynapseRequestFactory(tag, proxied) + self.site_tag = site_tag - if proxied: - self._log_formatter = proxiedLogFormatter - else: - self._log_formatter = combinedLogFormatter + proxied = config.get("x_forwarded", False) + self.requestFactory = SynapseRequestFactory(self, proxied) self.access_logger = logging.getLogger(logger_name) def log(self, request): - line = self._log_formatter(self._logDateTime, request) - self.access_logger.info(line) + pass def create_resource_tree(desired_tree, redirect_root_to_web_client=True): diff --git a/synapse/http/server.py b/synapse/http/server.py index 34645a371a..807ff95c65 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -79,65 +79,39 @@ def request_handler(request_handler): _next_request_id += 1 with LoggingContext(request_id) as request_context: request_context.request = request_id - code = None - start = self.clock.time_msec() - try: - logger.info( - "%s - Received request: %s %s", - request.getClientIP(), - request.method, - request.get_redacted_uri() - ) - d = request_handler(self, request) - with PreserveLoggingContext(): - yield d - code = request.code - except CodeMessageException as e: - code = e.code - if isinstance(e, SynapseError): - logger.info( - "%s SynapseError: %s - %s", request, code, e.msg + with request.processing(): + try: + d = request_handler(self, request) + with PreserveLoggingContext(): + yield d + except CodeMessageException as e: + code = e.code + if isinstance(e, SynapseError): + logger.info( + "%s SynapseError: %s - %s", request, code, e.msg + ) + else: + logger.exception(e) + outgoing_responses_counter.inc(request.method, str(code)) + respond_with_json( + request, code, cs_exception(e), send_cors=True, + pretty_print=_request_user_agent_is_curl(request), + version_string=self.version_string, + ) + except: + logger.exception( + "Failed handle request %s.%s on %r: %r", + request_handler.__module__, + request_handler.__name__, + self, + request + ) + respond_with_json( + request, + 500, + {"error": "Internal server error"}, + send_cors=True ) - else: - logger.exception(e) - outgoing_responses_counter.inc(request.method, str(code)) - respond_with_json( - request, code, cs_exception(e), send_cors=True, - pretty_print=_request_user_agent_is_curl(request), - version_string=self.version_string, - ) - except: - code = 500 - logger.exception( - "Failed handle request %s.%s on %r: %r", - request_handler.__module__, - request_handler.__name__, - self, - request - ) - respond_with_json( - request, - 500, - {"error": "Internal server error"}, - send_cors=True - ) - finally: - code = str(code) if code else "-" - end = self.clock.time_msec() - logger.info( - "%s - %s - {%s}" - " Processed request: %dms %sB %s \"%s %s %s\" \"%s\"", - request.getClientIP(), - request.site_tag, - request.authenticated_entity, - end-start, - request.sentLength, - code, - request.method, - request.get_redacted_uri(), - request.clientproto, - request.get_user_agent(), - ) return wrapped_request_handler -- cgit 1.4.1 From 9d112f444036eec00bfc44f8947f60fd48f9c7e1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 10:13:03 +0100 Subject: Add IDs to outbound transactions --- synapse/http/matrixfederationclient.py | 124 +++++++++++++++++++-------------- 1 file changed, 71 insertions(+), 53 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 7f3d8fc884..902b278419 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -35,11 +35,13 @@ from syutil.crypto.jsonsign import sign_json import simplejson as json import logging +import sys import urllib import urlparse logger = logging.getLogger(__name__) +outbound_logger = logging.getLogger("synapse.http.outbound") metrics = synapse.metrics.get_metrics_for(__name__) @@ -109,6 +111,8 @@ class MatrixFederationHttpClient(object): self.clock = hs.get_clock() self.version_string = hs.version_string + self._next_id = 1 + @defer.inlineCallbacks def _create_request(self, destination, method, path_bytes, body_callback, headers_dict={}, param_bytes=b"", @@ -123,8 +127,13 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - logger.info("Sending request to %s: %s %s", - destination, method, url_bytes) + txn_id = "%s-%s" % (method, self._next_id) + self._next_id = (self._next_id + 1) % (sys.maxint - 1) + + outbound_logger.info( + "{%s} [%s] Sending request: %s %s", + txn_id, destination, method, url_bytes + ) logger.debug( "Types: %s", @@ -141,63 +150,72 @@ class MatrixFederationHttpClient(object): endpoint = self._getEndpoint(reactor, destination) - while True: - producer = None - if body_callback: - producer = body_callback(method, url_bytes, headers_dict) - - try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + log_result = None + try: + while True: + producer = None + if body_callback: + producer = body_callback(method, url_bytes, headers_dict) + + try: + request_deferred = preserve_context_over_fn( + self.agent.request, + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, - ) + response = yield self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) + + logger.debug("{%s} Got response to %s", txn_id, method) + log_result = "%d %s" % (response.code, response.phrase,) + break + except Exception as e: + if not retry_on_dns_fail and isinstance(e, DNSLookupError): + logger.warn( + "DNS Lookup failed to %s with %s", + destination, + e + ) + log_result = "DNS Lookup failed to %s with %s" % ( + destination, e + ) + raise - logger.debug("Got response to %s", method) - break - except Exception as e: - if not retry_on_dns_fail and isinstance(e, DNSLookupError): logger.warn( - "DNS Lookup failed to %s with %s", + "{%s} Sending request failed to %s: %s %s: %s - %s", + txn_id, destination, - e + method, + url_bytes, + type(e).__name__, + _flatten_response_never_received(e), ) - raise - - logger.warn( - "Sending request failed to %s: %s %s: %s - %s", - destination, - method, - url_bytes, - type(e).__name__, - _flatten_response_never_received(e), - ) - if retries_left and not timeout: - yield sleep(2 ** (5 - retries_left)) - retries_left -= 1 - else: - raise - - logger.info( - "Received response %d %s for %s: %s %s", - response.code, - response.phrase, - destination, - method, - url_bytes - ) + log_result = "%s - %s" % ( + type(e).__name__, _flatten_response_never_received(e), + ) + + if retries_left and not timeout: + yield sleep(2 ** (5 - retries_left)) + retries_left -= 1 + else: + raise + finally: + outbound_logger.info( + "{%s} [%s] Result: %s", + txn_id, + destination, + log_result, + ) if 200 <= response.code < 300: pass -- cgit 1.4.1 From 18968efa0afacc72b91d626d1a6adc2d5476b130 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 10:18:02 +0100 Subject: Remove stale debug lines --- synapse/http/matrixfederationclient.py | 10 ---------- 1 file changed, 10 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 902b278419..1b90692731 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -135,15 +135,6 @@ class MatrixFederationHttpClient(object): txn_id, destination, method, url_bytes ) - logger.debug( - "Types: %s", - [ - type(destination), type(method), type(path_bytes), - type(param_bytes), - type(query_bytes) - ] - ) - # XXX: Would be much nicer to retry only at the transaction-layer # (once we have reliable transactions in place) retries_left = 5 @@ -175,7 +166,6 @@ class MatrixFederationHttpClient(object): time_out=timeout/1000. if timeout else 60, ) - logger.debug("{%s} Got response to %s", txn_id, method) log_result = "%d %s" % (response.code, response.phrase,) break except Exception as e: -- cgit 1.4.1 From 653533a3dac1790f218aa4978f775f8098656b11 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 19 Jun 2015 11:45:55 +0100 Subject: Fix log context when sending requests --- synapse/http/client.py | 10 +++--- synapse/http/matrixfederationclient.py | 58 ++++++++++++++++++---------------- synapse/util/logcontext.py | 52 +++++++++++++++++++----------- 3 files changed, 68 insertions(+), 52 deletions(-) (limited to 'synapse/http') diff --git a/synapse/http/client.py b/synapse/http/client.py index 9091ae2d38..49737d55da 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -109,7 +109,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(query_bytes)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -128,7 +128,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -161,7 +161,7 @@ class SimpleHttpClient(object): }) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -204,7 +204,7 @@ class SimpleHttpClient(object): bodyProducer=FileBodyProducer(StringIO(json_str)) ) - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) if 200 <= response.code < 300: defer.returnValue(json.loads(body)) @@ -238,7 +238,7 @@ class CaptchaServerHttpClient(SimpleHttpClient): ) try: - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(body) except PartialDownloadError as e: # twisted dislikes google's response, no content length. diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 1b90692731..ed47e701e7 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -127,7 +127,7 @@ class MatrixFederationHttpClient(object): ("", "", path_bytes, param_bytes, query_bytes, "",) ) - txn_id = "%s-%s" % (method, self._next_id) + txn_id = "%s-O-%s" % (method, self._next_id) self._next_id = (self._next_id + 1) % (sys.maxint - 1) outbound_logger.info( @@ -139,7 +139,9 @@ class MatrixFederationHttpClient(object): # (once we have reliable transactions in place) retries_left = 5 - endpoint = self._getEndpoint(reactor, destination) + endpoint = preserve_context_over_fn( + self._getEndpoint, reactor, destination + ) log_result = None try: @@ -149,21 +151,25 @@ class MatrixFederationHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - request_deferred = preserve_context_over_fn( - self.agent.request, - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + def send_request(): + request_deferred = self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) + + return self.clock.time_bound_deferred( + request_deferred, + time_out=timeout/1000. if timeout else 60, + ) - response = yield self.clock.time_bound_deferred( - request_deferred, - time_out=timeout/1000. if timeout else 60, + response = yield preserve_context_over_fn( + send_request, ) log_result = "%d %s" % (response.code, response.phrase,) @@ -212,7 +218,7 @@ class MatrixFederationHttpClient(object): else: # :'( # Update transactions table? - body = yield readBody(response) + body = yield preserve_context_over_fn(readBody, response) raise HttpResponseException( response.code, response.phrase, body ) @@ -292,10 +298,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") - + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @defer.inlineCallbacks @@ -338,9 +341,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -398,9 +399,7 @@ class MatrixFederationHttpClient(object): "Content-Type not application/json" ) - logger.debug("Getting resp body") - body = yield readBody(response) - logger.debug("Got resp body") + body = yield preserve_context_over_fn(readBody, response) defer.returnValue(json.loads(body)) @@ -443,7 +442,10 @@ class MatrixFederationHttpClient(object): headers = dict(response.headers.getAllRawHeaders()) try: - length = yield _readBodyToFile(response, output_stream, max_size) + length = yield preserve_context_over_fn( + _readBodyToFile, + response, output_stream, max_size + ) except: logger.exception("Failed to download body") raise diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index a92d518b43..7e6062c1b8 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -140,6 +140,37 @@ class PreserveLoggingContext(object): ) +class _PreservingContextDeferred(defer.Deferred): + """A deferred that ensures that all callbacks and errbacks are called with + the given logging context. + """ + def __init__(self, context): + self._log_context = context + defer.Deferred.__init__(self) + + def addCallbacks(self, callback, errback=None, + callbackArgs=None, callbackKeywords=None, + errbackArgs=None, errbackKeywords=None): + callback = self._wrap_callback(callback) + errback = self._wrap_callback(errback) + return defer.Deferred.addCallbacks( + self, callback, + errback=errback, + callbackArgs=callbackArgs, + callbackKeywords=callbackKeywords, + errbackArgs=errbackArgs, + errbackKeywords=errbackKeywords, + ) + + def _wrap_callback(self, f): + def g(res, *args, **kwargs): + with PreserveLoggingContext(): + LoggingContext.thread_local.current_context = self._log_context + res = f(res, *args, **kwargs) + return res + return g + + def preserve_context_over_fn(fn, *args, **kwargs): """Takes a function and invokes it with the given arguments, but removes and restores the current logging context while doing so. @@ -160,24 +191,7 @@ def preserve_context_over_deferred(deferred): """Given a deferred wrap it such that any callbacks added later to it will be invoked with the current context. """ - d = defer.Deferred() - current_context = LoggingContext.current_context() - - def cb(res): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context - res = d.callback(res) - return res - - def eb(failure): - with PreserveLoggingContext(): - LoggingContext.thread_local.current_context = current_context - res = d.errback(failure) - return res - - if deferred.called: - return deferred - - deferred.addCallbacks(cb, eb) + d = _PreservingContextDeferred(current_context) + deferred.chainDeferred(d) return d -- cgit 1.4.1