diff --git a/synapse/http/server.py b/synapse/http/server.py
index 6dacb31037..4e4c02e170 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -25,7 +25,7 @@ from canonicaljson import encode_canonical_json, encode_pretty_printed_json, jso
from twisted.internet import defer
from twisted.python import failure
-from twisted.web import resource, server
+from twisted.web import resource
from twisted.web.server import NOT_DONE_YET
from twisted.web.util import redirectTo
@@ -37,10 +37,8 @@ from synapse.api.errors import (
SynapseError,
UnrecognizedRequestError,
)
-from synapse.http.request_metrics import requests_counter
from synapse.util.caches import intern_dict
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-from synapse.util.metrics import Measure
+from synapse.util.logcontext import preserve_fn
logger = logging.getLogger(__name__)
@@ -60,11 +58,10 @@ HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
def wrap_json_request_handler(h):
"""Wraps a request handler method with exception handling.
- Also adds logging as per wrap_request_handler_with_logging.
+ Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)",
- where "self" must have a "clock" attribute (and "request" must be a
- SynapseRequest).
+ where "request" must be a SynapseRequest.
The handler must return a deferred. If the deferred succeeds we assume that
a response has been sent. If the deferred fails with a SynapseError we use
@@ -108,24 +105,23 @@ def wrap_json_request_handler(h):
pretty_print=_request_user_agent_is_curl(request),
)
- return wrap_request_handler_with_logging(wrapped_request_handler)
+ return wrap_async_request_handler(wrapped_request_handler)
def wrap_html_request_handler(h):
"""Wraps a request handler method with exception handling.
- Also adds logging as per wrap_request_handler_with_logging.
+ Also does the wrapping with request.processing as per wrap_async_request_handler.
The handler method must have a signature of "handle_foo(self, request)",
- where "self" must have a "clock" attribute (and "request" must be a
- SynapseRequest).
+ where "request" must be a SynapseRequest.
"""
def wrapped_request_handler(self, request):
d = defer.maybeDeferred(h, self, request)
d.addErrback(_return_html_error, request)
return d
- return wrap_request_handler_with_logging(wrapped_request_handler)
+ return wrap_async_request_handler(wrapped_request_handler)
def _return_html_error(f, request):
@@ -170,46 +166,26 @@ def _return_html_error(f, request):
finish_request(request)
-def wrap_request_handler_with_logging(h):
- """Wraps a request handler to provide logging and metrics
+def wrap_async_request_handler(h):
+ """Wraps an async request handler so that it calls request.processing.
+
+ This helps ensure that work done by the request handler after the request is completed
+ is correctly recorded against the request metrics/logs.
The handler method must have a signature of "handle_foo(self, request)",
- where "self" must have a "clock" attribute (and "request" must be a
- SynapseRequest).
+ where "request" must be a SynapseRequest.
- As well as calling `request.processing` (which will log the response and
- duration for this request), the wrapped request handler will insert the
- request id into the logging context.
+ The handler may return a deferred, in which case the completion of the request isn't
+ logged until the deferred completes.
"""
@defer.inlineCallbacks
- def wrapped_request_handler(self, request):
- """
- Args:
- self:
- request (synapse.http.site.SynapseRequest):
- """
+ def wrapped_async_request_handler(self, request):
+ with request.processing():
+ yield h(self, request)
- request_id = request.get_request_id()
- with LoggingContext(request_id) as request_context:
- request_context.request = request_id
- with Measure(self.clock, "wrapped_request_handler"):
- # we start the request metrics timer here with an initial stab
- # at the servlet name. For most requests that name will be
- # JsonResource (or a subclass), and JsonResource._async_render
- # will update it once it picks a servlet.
- servlet_name = self.__class__.__name__
- with request.processing(servlet_name):
- with PreserveLoggingContext(request_context):
- d = defer.maybeDeferred(h, self, request)
-
- # record the arrival of the request *after*
- # dispatching to the handler, so that the handler
- # can update the servlet name in the request
- # metrics
- requests_counter.labels(request.method,
- request.request_metrics.name).inc()
- yield d
- return wrapped_request_handler
+ # we need to preserve_fn here, because the synchronous render method won't yield for
+ # us (obviously)
+ return preserve_fn(wrapped_async_request_handler)
class HttpServer(object):
@@ -272,7 +248,7 @@ class JsonResource(HttpServer, resource.Resource):
""" This gets called by twisted every time someone sends us a request.
"""
self._async_render(request)
- return server.NOT_DONE_YET
+ return NOT_DONE_YET
@wrap_json_request_handler
@defer.inlineCallbacks
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5fd30a4c2c..f5a8f78406 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import contextlib
import logging
import time
@@ -19,8 +18,8 @@ import time
from twisted.web.server import Request, Site
from synapse.http import redact_uri
-from synapse.http.request_metrics import RequestMetrics
-from synapse.util.logcontext import ContextResourceUsage, LoggingContext
+from synapse.http.request_metrics import RequestMetrics, requests_counter
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@@ -34,25 +33,43 @@ class SynapseRequest(Request):
It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID
+ * A log context associated with the request
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
- It provides a method `processing` which should be called by the Resource
- which is handling the request, and returns a context manager.
+ It also provides a method `processing`, which returns a context manager. If this
+ method is called, the request won't be logged until the context manager is closed;
+ this is useful for asynchronous request handlers which may go on processing the
+ request even after the client has disconnected.
+ Attributes:
+ logcontext(LoggingContext) : the log context for this request
"""
def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = site
- self._channel = channel
+ self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
+ # we can't yet create the logcontext, as we don't know the method.
+ self.logcontext = None
+
global _next_request_seq
self.request_seq = _next_request_seq
_next_request_seq += 1
+ # whether an asynchronous request handler has called processing()
+ self._is_processing = False
+
+ # the time when the asynchronous request handler completed its processing
+ self._processing_finished_time = None
+
+ # what time we finished sending the response to the client (or the connection
+ # dropped)
+ self.finish_time = None
+
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
@@ -74,11 +91,116 @@ class SynapseRequest(Request):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def render(self, resrc):
+ # this is called once a Resource has been found to serve the request; in our
+ # case the Resource in question will normally be a JsonResource.
+
+ # create a LogContext for this request
+ request_id = self.get_request_id()
+ logcontext = self.logcontext = LoggingContext(request_id)
+ logcontext.request = request_id
+
# override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string)
- return Request.render(self, resrc)
+
+ with PreserveLoggingContext(self.logcontext):
+ # we start the request metrics timer here with an initial stab
+ # at the servlet name. For most requests that name will be
+ # JsonResource (or a subclass), and JsonResource._async_render
+ # will update it once it picks a servlet.
+ servlet_name = resrc.__class__.__name__
+ self._started_processing(servlet_name)
+
+ Request.render(self, resrc)
+
+ # record the arrival of the request *after*
+ # dispatching to the handler, so that the handler
+ # can update the servlet name in the request
+ # metrics
+ requests_counter.labels(self.method,
+ self.request_metrics.name).inc()
+
+ @contextlib.contextmanager
+ def processing(self):
+ """Record the fact that we are processing this request.
+
+ Returns a context manager; the correct way to use this is:
+
+ @defer.inlineCallbacks
+ def handle_request(request):
+ with request.processing("FooServlet"):
+ yield really_handle_the_request()
+
+ Once the context manager is closed, the completion of the request will be logged,
+ and the various metrics will be updated.
+ """
+ if self._is_processing:
+ raise RuntimeError("Request is already processing")
+ self._is_processing = True
+
+ try:
+ yield
+ except Exception:
+ # this should already have been caught, and sent back to the client as a 500.
+ logger.exception("Asynchronous messge handler raised an uncaught exception")
+ finally:
+ # the request handler has finished its work and either sent the whole response
+ # back, or handed over responsibility to a Producer.
+
+ self._processing_finished_time = time.time()
+ self._is_processing = False
+
+ # if we've already sent the response, log it now; otherwise, we wait for the
+ # response to be sent.
+ if self.finish_time is not None:
+ self._finished_processing()
+
+ def finish(self):
+ """Called when all response data has been written to this Request.
+
+ Overrides twisted.web.server.Request.finish to record the finish time and do
+ logging.
+ """
+ self.finish_time = time.time()
+ Request.finish(self)
+ if not self._is_processing:
+ with PreserveLoggingContext(self.logcontext):
+ self._finished_processing()
+
+ def connectionLost(self, reason):
+ """Called when the client connection is closed before the response is written.
+
+ Overrides twisted.web.server.Request.connectionLost to record the finish time and
+ do logging.
+ """
+ self.finish_time = time.time()
+ Request.connectionLost(self, reason)
+
+ # we only get here if the connection to the client drops before we send
+ # the response.
+ #
+ # It's useful to log it here so that we can get an idea of when
+ # the client disconnects.
+ with PreserveLoggingContext(self.logcontext):
+ logger.warn(
+ "Error processing request: %s %s", reason.type, reason.value,
+ )
+
+ if not self._is_processing:
+ self._finished_processing()
def _started_processing(self, servlet_name):
+ """Record the fact that we are processing this request.
+
+ This will log the request's arrival. Once the request completes,
+ be sure to call finished_processing.
+
+ Args:
+ servlet_name (str): the name of the servlet which will be
+ processing this request. This is used in the metrics.
+
+ It is possible to update this afterwards by updating
+ self.request_metrics.name.
+ """
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
@@ -94,13 +216,21 @@ class SynapseRequest(Request):
)
def _finished_processing(self):
- try:
- context = LoggingContext.current_context()
- usage = context.get_resource_usage()
- except Exception:
- usage = ContextResourceUsage()
+ """Log the completion of this request and update the metrics
+ """
+
+ usage = self.logcontext.get_resource_usage()
+
+ if self._processing_finished_time is None:
+ # we completed the request without anything calling processing()
+ self._processing_finished_time = time.time()
- end_time = time.time()
+ # the time between receiving the request and the request handler finishing
+ processing_time = self._processing_finished_time - self.start_time
+
+ # the time between the request handler finishing and the response being sent
+ # to the client (nb may be negative)
+ response_send_time = self.finish_time - self._processing_finished_time
# need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header
@@ -116,22 +246,31 @@ class SynapseRequest(Request):
user_agent = self.get_user_agent()
if user_agent is not None:
user_agent = user_agent.decode("utf-8", "replace")
+ else:
+ user_agent = "-"
+
+ code = str(self.code)
+ if not self.finished:
+ # we didn't send the full response before we gave up (presumably because
+ # the connection dropped)
+ code += "!"
self.site.access_logger.info(
"%s - %s - {%s}"
- " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
+ " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
authenticated_entity,
- end_time - self.start_time,
+ processing_time,
+ response_send_time,
usage.ru_utime,
usage.ru_stime,
usage.db_sched_duration_sec,
usage.db_txn_duration_sec,
int(usage.db_txn_count),
self.sentLength,
- self.code,
+ code,
self.method,
self.get_redacted_uri(),
self.clientproto,
@@ -140,38 +279,10 @@ class SynapseRequest(Request):
)
try:
- self.request_metrics.stop(end_time, self)
+ self.request_metrics.stop(self.finish_time, self)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
- @contextlib.contextmanager
- def processing(self, servlet_name):
- """Record the fact that we are processing this request.
-
- Returns a context manager; the correct way to use this is:
-
- @defer.inlineCallbacks
- def handle_request(request):
- with request.processing("FooServlet"):
- yield really_handle_the_request()
-
- This will log the request's arrival. Once the context manager is
- closed, the completion of the request will be logged, and the various
- metrics will be updated.
-
- Args:
- servlet_name (str): the name of the servlet which will be
- processing this request. This is used in the metrics.
-
- It is possible to update this afterwards by updating
- self.request_metrics.servlet_name.
- """
- # TODO: we should probably just move this into render() and finish(),
- # to save having to call a separate method.
- self._started_processing(servlet_name)
- yield
- self._finished_processing()
-
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
|