diff --git a/synapse/http/site.py b/synapse/http/site.py
index 5fd30a4c2c..f5a8f78406 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -11,7 +11,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
import contextlib
import logging
import time
@@ -19,8 +18,8 @@ import time
from twisted.web.server import Request, Site
from synapse.http import redact_uri
-from synapse.http.request_metrics import RequestMetrics
-from synapse.util.logcontext import ContextResourceUsage, LoggingContext
+from synapse.http.request_metrics import RequestMetrics, requests_counter
+from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
logger = logging.getLogger(__name__)
@@ -34,25 +33,43 @@ class SynapseRequest(Request):
It extends twisted's twisted.web.server.Request, and adds:
* Unique request ID
+ * A log context associated with the request
* Redaction of access_token query-params in __repr__
* Logging at start and end
* Metrics to record CPU, wallclock and DB time by endpoint.
- It provides a method `processing` which should be called by the Resource
- which is handling the request, and returns a context manager.
+ It also provides a method `processing`, which returns a context manager. If this
+ method is called, the request won't be logged until the context manager is closed;
+ this is useful for asynchronous request handlers which may go on processing the
+ request even after the client has disconnected.
+ Attributes:
+ logcontext(LoggingContext) : the log context for this request
"""
def __init__(self, site, channel, *args, **kw):
Request.__init__(self, channel, *args, **kw)
self.site = site
- self._channel = channel
+ self._channel = channel # this is used by the tests
self.authenticated_entity = None
self.start_time = 0
+ # we can't yet create the logcontext, as we don't know the method.
+ self.logcontext = None
+
global _next_request_seq
self.request_seq = _next_request_seq
_next_request_seq += 1
+ # whether an asynchronous request handler has called processing()
+ self._is_processing = False
+
+ # the time when the asynchronous request handler completed its processing
+ self._processing_finished_time = None
+
+ # what time we finished sending the response to the client (or the connection
+ # dropped)
+ self.finish_time = None
+
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
@@ -74,11 +91,116 @@ class SynapseRequest(Request):
return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
def render(self, resrc):
+ # this is called once a Resource has been found to serve the request; in our
+ # case the Resource in question will normally be a JsonResource.
+
+ # create a LogContext for this request
+ request_id = self.get_request_id()
+ logcontext = self.logcontext = LoggingContext(request_id)
+ logcontext.request = request_id
+
# override the Server header which is set by twisted
self.setHeader("Server", self.site.server_version_string)
- return Request.render(self, resrc)
+
+ with PreserveLoggingContext(self.logcontext):
+ # we start the request metrics timer here with an initial stab
+ # at the servlet name. For most requests that name will be
+ # JsonResource (or a subclass), and JsonResource._async_render
+ # will update it once it picks a servlet.
+ servlet_name = resrc.__class__.__name__
+ self._started_processing(servlet_name)
+
+ Request.render(self, resrc)
+
+ # record the arrival of the request *after*
+ # dispatching to the handler, so that the handler
+ # can update the servlet name in the request
+ # metrics
+ requests_counter.labels(self.method,
+ self.request_metrics.name).inc()
+
+ @contextlib.contextmanager
+ def processing(self):
+ """Record the fact that we are processing this request.
+
+ Returns a context manager; the correct way to use this is:
+
+ @defer.inlineCallbacks
+ def handle_request(request):
+ with request.processing("FooServlet"):
+ yield really_handle_the_request()
+
+ Once the context manager is closed, the completion of the request will be logged,
+ and the various metrics will be updated.
+ """
+ if self._is_processing:
+ raise RuntimeError("Request is already processing")
+ self._is_processing = True
+
+ try:
+ yield
+ except Exception:
+ # this should already have been caught, and sent back to the client as a 500.
+ logger.exception("Asynchronous messge handler raised an uncaught exception")
+ finally:
+ # the request handler has finished its work and either sent the whole response
+ # back, or handed over responsibility to a Producer.
+
+ self._processing_finished_time = time.time()
+ self._is_processing = False
+
+ # if we've already sent the response, log it now; otherwise, we wait for the
+ # response to be sent.
+ if self.finish_time is not None:
+ self._finished_processing()
+
+ def finish(self):
+ """Called when all response data has been written to this Request.
+
+ Overrides twisted.web.server.Request.finish to record the finish time and do
+ logging.
+ """
+ self.finish_time = time.time()
+ Request.finish(self)
+ if not self._is_processing:
+ with PreserveLoggingContext(self.logcontext):
+ self._finished_processing()
+
+ def connectionLost(self, reason):
+ """Called when the client connection is closed before the response is written.
+
+ Overrides twisted.web.server.Request.connectionLost to record the finish time and
+ do logging.
+ """
+ self.finish_time = time.time()
+ Request.connectionLost(self, reason)
+
+ # we only get here if the connection to the client drops before we send
+ # the response.
+ #
+ # It's useful to log it here so that we can get an idea of when
+ # the client disconnects.
+ with PreserveLoggingContext(self.logcontext):
+ logger.warn(
+ "Error processing request: %s %s", reason.type, reason.value,
+ )
+
+ if not self._is_processing:
+ self._finished_processing()
def _started_processing(self, servlet_name):
+ """Record the fact that we are processing this request.
+
+ This will log the request's arrival. Once the request completes,
+ be sure to call finished_processing.
+
+ Args:
+ servlet_name (str): the name of the servlet which will be
+ processing this request. This is used in the metrics.
+
+ It is possible to update this afterwards by updating
+ self.request_metrics.name.
+ """
self.start_time = time.time()
self.request_metrics = RequestMetrics()
self.request_metrics.start(
@@ -94,13 +216,21 @@ class SynapseRequest(Request):
)
def _finished_processing(self):
- try:
- context = LoggingContext.current_context()
- usage = context.get_resource_usage()
- except Exception:
- usage = ContextResourceUsage()
+ """Log the completion of this request and update the metrics
+ """
+
+ usage = self.logcontext.get_resource_usage()
+
+ if self._processing_finished_time is None:
+ # we completed the request without anything calling processing()
+ self._processing_finished_time = time.time()
- end_time = time.time()
+ # the time between receiving the request and the request handler finishing
+ processing_time = self._processing_finished_time - self.start_time
+
+ # the time between the request handler finishing and the response being sent
+ # to the client (nb may be negative)
+ response_send_time = self.finish_time - self._processing_finished_time
# need to decode as it could be raw utf-8 bytes
# from a IDN servname in an auth header
@@ -116,22 +246,31 @@ class SynapseRequest(Request):
user_agent = self.get_user_agent()
if user_agent is not None:
user_agent = user_agent.decode("utf-8", "replace")
+ else:
+ user_agent = "-"
+
+ code = str(self.code)
+ if not self.finished:
+ # we didn't send the full response before we gave up (presumably because
+ # the connection dropped)
+ code += "!"
self.site.access_logger.info(
"%s - %s - {%s}"
- " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
+ " Processed request: %.3fsec/%.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\" [%d dbevts]",
self.getClientIP(),
self.site.site_tag,
authenticated_entity,
- end_time - self.start_time,
+ processing_time,
+ response_send_time,
usage.ru_utime,
usage.ru_stime,
usage.db_sched_duration_sec,
usage.db_txn_duration_sec,
int(usage.db_txn_count),
self.sentLength,
- self.code,
+ code,
self.method,
self.get_redacted_uri(),
self.clientproto,
@@ -140,38 +279,10 @@ class SynapseRequest(Request):
)
try:
- self.request_metrics.stop(end_time, self)
+ self.request_metrics.stop(self.finish_time, self)
except Exception as e:
logger.warn("Failed to stop metrics: %r", e)
- @contextlib.contextmanager
- def processing(self, servlet_name):
- """Record the fact that we are processing this request.
-
- Returns a context manager; the correct way to use this is:
-
- @defer.inlineCallbacks
- def handle_request(request):
- with request.processing("FooServlet"):
- yield really_handle_the_request()
-
- This will log the request's arrival. Once the context manager is
- closed, the completion of the request will be logged, and the various
- metrics will be updated.
-
- Args:
- servlet_name (str): the name of the servlet which will be
- processing this request. This is used in the metrics.
-
- It is possible to update this afterwards by updating
- self.request_metrics.servlet_name.
- """
- # TODO: we should probably just move this into render() and finish(),
- # to save having to call a separate method.
- self._started_processing(servlet_name)
- yield
- self._finished_processing()
-
class XForwardedForRequest(SynapseRequest):
def __init__(self, *args, **kw):
|