diff options
author | Richard van der Hoff <1389908+richvdh@users.noreply.github.com> | 2018-08-17 14:57:45 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-17 14:57:45 +0100 |
commit | 3f8709ffe49f5d72cc6f46ef6fd386a6206bebb9 (patch) | |
tree | 65653b98566731efb846c79f08261ecd2bd3adb3 /synapse/http | |
parent | Merge pull request #3707 from matrix-org/neilj/limit_exceeded_error (diff) | |
parent | changelog (diff) | |
download | synapse-3f8709ffe49f5d72cc6f46ef6fd386a6206bebb9.tar.xz |
Merge pull request #3700 from matrix-org/rav/wait_for_producers
Refactor request logging code
Diffstat (limited to 'synapse/http')
-rw-r--r-- | synapse/http/server.py | 70 | ||||
-rw-r--r-- | synapse/http/site.py | 201 |
2 files changed, 179 insertions, 92 deletions
diff --git a/synapse/http/server.py b/synapse/http/server.py index 6dacb31037..4e4c02e170 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -25,7 +25,7 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso from twisted.internet import defer from twisted.python import failure -from twisted.web import resource, server +from twisted.web import resource from twisted.web.server import NOT_DONE_YET from twisted.web.util import redirectTo @@ -37,10 +37,8 @@ from synapse.api.errors import ( SynapseError, UnrecognizedRequestError, ) -from synapse.http.request_metrics import requests_counter from synapse.util.caches import intern_dict -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext -from synapse.util.metrics import Measure +from synapse.util.logcontext import preserve_fn logger = logging.getLogger(__name__) @@ -60,11 +58,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html> def wrap_json_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. The handler must return a deferred. If the deferred succeeds we assume that a response has been sent. If the deferred fails with a SynapseError we use @@ -108,24 +105,23 @@ def wrap_json_request_handler(h): pretty_print=_request_user_agent_is_curl(request), ) - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def wrap_html_request_handler(h): """Wraps a request handler method with exception handling. - Also adds logging as per wrap_request_handler_with_logging. + Also does the wrapping with request.processing as per wrap_async_request_handler. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. """ def wrapped_request_handler(self, request): d = defer.maybeDeferred(h, self, request) d.addErrback(_return_html_error, request) return d - return wrap_request_handler_with_logging(wrapped_request_handler) + return wrap_async_request_handler(wrapped_request_handler) def _return_html_error(f, request): @@ -170,46 +166,26 @@ def _return_html_error(f, request): finish_request(request) -def wrap_request_handler_with_logging(h): - """Wraps a request handler to provide logging and metrics +def wrap_async_request_handler(h): + """Wraps an async request handler so that it calls request.processing. + + This helps ensure that work done by the request handler after the request is completed + is correctly recorded against the request metrics/logs. The handler method must have a signature of "handle_foo(self, request)", - where "self" must have a "clock" attribute (and "request" must be a - SynapseRequest). + where "request" must be a SynapseRequest. - As well as calling `request.processing` (which will log the response and - duration for this request), the wrapped request handler will insert the - request id into the logging context. + The handler may return a deferred, in which case the completion of the request isn't + logged until the deferred completes. """ @defer.inlineCallbacks - def wrapped_request_handler(self, request): - """ - Args: - self: - request (synapse.http.site.SynapseRequest): - """ + def wrapped_async_request_handler(self, request): + with request.processing(): + yield h(self, request) - request_id = request.get_request_id() - with LoggingContext(request_id) as request_context: - request_context.request = request_id - with Measure(self.clock, "wrapped_request_handler"): - # we start the request metrics timer here with an initial stab - # at the servlet name. For most requests that name will be - # JsonResource (or a subclass), and JsonResource._async_render - # will update it once it picks a servlet. - servlet_name = self.__class__.__name__ - with request.processing(servlet_name): - with PreserveLoggingContext(request_context): - d = defer.maybeDeferred(h, self, request) - - # record the arrival of the request *after* - # dispatching to the handler, so that the handler - # can update the servlet name in the request - # metrics - requests_counter.labels(request.method, - request.request_metrics.name).inc() - yield d - return wrapped_request_handler + # we need to preserve_fn here, because the synchronous render method won't yield for + # us (obviously) + return preserve_fn(wrapped_async_request_handler) class HttpServer(object): @@ -272,7 +248,7 @@ class JsonResource(HttpServer, resource.Resource): """ This gets called by twisted every time someone sends us a request. """ self._async_render(request) - return server.NOT_DONE_YET + return NOT_DONE_YET @wrap_json_request_handler @defer.inlineCallbacks diff --git a/synapse/http/site.py b/synapse/http/site.py index 5fd30a4c2c..f5a8f78406 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -11,7 +11,6 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - import contextlib import logging import time @@ -19,8 +18,8 @@ import time from twisted.web.server import Request, Site from synapse.http import redact_uri -from synapse.http.request_metrics import RequestMetrics -from synapse.util.logcontext import ContextResourceUsage, LoggingContext +from synapse.http.request_metrics import RequestMetrics, requests_counter +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext logger = logging.getLogger(__name__) @@ -34,25 +33,43 @@ class SynapseRequest(Request): It extends twisted's twisted.web.server.Request, and adds: * Unique request ID + * A log context associated with the request * Redaction of access_token query-params in __repr__ * Logging at start and end * Metrics to record CPU, wallclock and DB time by endpoint. - It provides a method `processing` which should be called by the Resource - which is handling the request, and returns a context manager. + It also provides a method `processing`, which returns a context manager. If this + method is called, the request won't be logged until the context manager is closed; + this is useful for asynchronous request handlers which may go on processing the + request even after the client has disconnected. + Attributes: + logcontext(LoggingContext) : the log context for this request """ def __init__(self, site, channel, *args, **kw): Request.__init__(self, channel, *args, **kw) self.site = site - self._channel = channel + self._channel = channel # this is used by the tests self.authenticated_entity = None self.start_time = 0 + # we can't yet create the logcontext, as we don't know the method. + self.logcontext = None + global _next_request_seq self.request_seq = _next_request_seq _next_request_seq += 1 + # whether an asynchronous request handler has called processing() + self._is_processing = False + + # the time when the asynchronous request handler completed its processing + self._processing_finished_time = None + + # what time we finished sending the response to the client (or the connection + # dropped) + self.finish_time = None + def __repr__(self): # We overwrite this so that we don't log ``access_token`` return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % ( @@ -74,11 +91,116 @@ class SynapseRequest(Request): return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1] def render(self, resrc): + # this is called once a Resource has been found to serve the request; in our + # case the Resource in question will normally be a JsonResource. + + # create a LogContext for this request + request_id = self.get_request_id() + logcontext = self.logcontext = LoggingContext(request_id) + logcontext.request = request_id + # override the Server header which is set by twisted self.setHeader("Server", self.site.server_version_string) - return Request.render(self, resrc) + + with PreserveLoggingContext(self.logcontext): + # we start the request metrics timer here with an initial stab + # at the servlet name. For most requests that name will be + # JsonResource (or a subclass), and JsonResource._async_render + # will update it once it picks a servlet. + servlet_name = resrc.__class__.__name__ + self._started_processing(servlet_name) + + Request.render(self, resrc) + + # record the arrival of the request *after* + # dispatching to the handler, so that the handler + # can update the servlet name in the request + # metrics + requests_counter.labels(self.method, + self.request_metrics.name).inc() + + @contextlib.contextmanager + def processing(self): + """Record the fact that we are processing this request. + + Returns a context manager; the correct way to use this is: + + @defer.inlineCallbacks + def handle_request(request): + with request.processing("FooServlet"): + yield really_handle_the_request() + + Once the context manager is closed, the completion of the request will be logged, + and the various metrics will be updated. + """ + if self._is_processing: + raise RuntimeError("Request is already processing") + self._is_processing = True + + try: + yield + except Exception: + # this should already have been caught, and sent back to the client as a 500. + logger.exception("Asynchronous messge handler raised an uncaught exception") + finally: + # the request handler has finished its work and either sent the whole response + # back, or handed over responsibility to a Producer. + + self._processing_finished_time = time.time() + self._is_processing = False + + # if we've already sent the response, log it now; otherwise, we wait for the + # response to be sent. + if self.finish_time is not None: + self._finished_processing() + + def finish(self): + """Called when all response data has been written to this Request. + + Overrides twisted.web.server.Request.finish to record the finish time and do + logging. + """ + self.finish_time = time.time() + Request.finish(self) + if not self._is_processing: + with PreserveLoggingContext(self.logcontext): + self._finished_processing() + + def connectionLost(self, reason): + """Called when the client connection is closed before the response is written. + + Overrides twisted.web.server.Request.connectionLost to record the finish time and + do logging. + """ + self.finish_time = time.time() + Request.connectionLost(self, reason) + + # we only get here if the connection to the client drops before we send + # the response. + # + # It's useful to log it here so that we can get an idea of when + # the client disconnects. + with PreserveLoggingContext(self.logcontext): + logger.warn( + "Error processing request: %s %s", reason.type, reason.value, + ) + + if not self._is_processing: + self._finished_processing() def _started_processing(self, servlet_name): + """Record the fact that we are processing this request. + + This will log the request's arrival. Once the request completes, + be sure to call finished_processing. + + Args: + servlet_name (str): the name of the servlet which will be + processing this request. This is used in the metrics. + + It is possible to update this afterwards by updating + self.request_metrics.name. + """ self.start_time = time.time() self.request_metrics = RequestMetrics() self.request_metrics.start( @@ -94,13 +216,21 @@ class SynapseRequest(Request): ) def _finished_processing(self): - try: - context = LoggingContext.current_context() - usage = context.get_resource_usage() - except Exception: - usage = ContextResourceUsage() + """Log the completion of this request and update the metrics + """ + + usage = self.logcontext.get_resource_usage() + + if self._processing_finished_time is None: + # we completed the request without anything calling processing() + self._processing_finished_time = time.time() - end_time = time.time() + # the time between receiving the request and the request handler finishing + processing_time = self._processing_finished_time - self.start_time + + # the time between the request handler finishing and the response being sent + # to the client (nb may be negative) + response_send_time = self.finish_time - self._processing_finished_time # need to decode as it could be raw utf-8 bytes # from a IDN servname in an auth header @@ -116,22 +246,31 @@ class SynapseRequest(Request): user_agent = self.get_user_agent() if user_agent is not None: user_agent = user_agent.decode("utf-8", "replace") + else: + user_agent = "-" + + code = str(self.code) + if not self.finished: + # we didn't send the full response before we gave up (presumably because + # the connection dropped) + code += "!" self.site.access_logger.info( "%s - %s - {%s}" - " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" + " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)" " %sB %s \"%s %s %s\" \"%s\" [%d dbevts]", self.getClientIP(), self.site.site_tag, authenticated_entity, - end_time - self.start_time, + processing_time, + response_send_time, usage.ru_utime, usage.ru_stime, usage.db_sched_duration_sec, usage.db_txn_duration_sec, int(usage.db_txn_count), self.sentLength, - self.code, + code, self.method, self.get_redacted_uri(), self.clientproto, @@ -140,38 +279,10 @@ class SynapseRequest(Request): ) try: - self.request_metrics.stop(end_time, self) + self.request_metrics.stop(self.finish_time, self) except Exception as e: logger.warn("Failed to stop metrics: %r", e) - @contextlib.contextmanager - def processing(self, servlet_name): - """Record the fact that we are processing this request. - - Returns a context manager; the correct way to use this is: - - @defer.inlineCallbacks - def handle_request(request): - with request.processing("FooServlet"): - yield really_handle_the_request() - - This will log the request's arrival. Once the context manager is - closed, the completion of the request will be logged, and the various - metrics will be updated. - - Args: - servlet_name (str): the name of the servlet which will be - processing this request. This is used in the metrics. - - It is possible to update this afterwards by updating - self.request_metrics.servlet_name. - """ - # TODO: we should probably just move this into render() and finish(), - # to save having to call a separate method. - self._started_processing(servlet_name) - yield - self._finished_processing() - class XForwardedForRequest(SynapseRequest): def __init__(self, *args, **kw): |