diff --git a/synapse/http/__init__.py b/synapse/http/__init__.py
index bfebb0f644..054372e179 100644
--- a/synapse/http/__init__.py
+++ b/synapse/http/__init__.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,3 +13,24 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+from twisted.internet.defer import CancelledError
+from twisted.python import failure
+
+from synapse.api.errors import SynapseError
+
+
+class RequestTimedOutError(SynapseError):
+ """Exception representing timeout of an outbound request"""
+ def __init__(self):
+ super(RequestTimedOutError, self).__init__(504, "Timed out")
+
+
+def cancelled_to_request_timed_out_error(value, timeout):
+ """Turns CancelledErrors into RequestTimedOutErrors.
+
+ For use with async.add_timeout_to_deferred
+ """
+ if isinstance(value, failure.Failure):
+ value.trap(CancelledError)
+ raise RequestTimedOutError()
+ return value
diff --git a/synapse/http/additional_resource.py b/synapse/http/additional_resource.py
index 343e932cb1..a797396ade 100644
--- a/synapse/http/additional_resource.py
+++ b/synapse/http/additional_resource.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.http.server import wrap_request_handler
+from synapse.http.server import wrap_json_request_handler
from twisted.web.resource import Resource
from twisted.web.server import NOT_DONE_YET
@@ -42,14 +42,13 @@ class AdditionalResource(Resource):
Resource.__init__(self)
self._handler = handler
- # these are required by the request_handler wrapper
- self.version_string = hs.version_string
+ # required by the request_handler wrapper
self.clock = hs.get_clock()
def render(self, request):
self._async_render(request)
return NOT_DONE_YET
- @wrap_request_handler
+ @wrap_json_request_handler
def _async_render(self, request):
return self._handler(request)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index f3e4973c2e..4d4eee3d64 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -18,10 +19,10 @@ from OpenSSL.SSL import VERIFY_NONE
from synapse.api.errors import (
CodeMessageException, MatrixCodeMessageException, SynapseError, Codes,
)
+from synapse.http import cancelled_to_request_timed_out_error
+from synapse.util.async import add_timeout_to_deferred
from synapse.util.caches import CACHE_SIZE_FACTOR
from synapse.util.logcontext import make_deferred_yieldable
-from synapse.util import logcontext
-import synapse.metrics
from synapse.http.endpoint import SpiderEndpoint
from canonicaljson import encode_canonical_json
@@ -38,8 +39,9 @@ from twisted.web.http import PotentialDataLoss
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
-from StringIO import StringIO
+from six import StringIO
+from prometheus_client import Counter
import simplejson as json
import logging
import urllib
@@ -47,16 +49,9 @@ import urllib
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-outgoing_requests_counter = metrics.register_counter(
- "requests",
- labels=["method"],
-)
-incoming_responses_counter = metrics.register_counter(
- "responses",
- labels=["method", "code"],
-)
+outgoing_requests_counter = Counter("synapse_http_client_requests", "", ["method"])
+incoming_responses_counter = Counter("synapse_http_client_responses", "",
+ ["method", "code"])
class SimpleHttpClient(object):
@@ -93,32 +88,28 @@ class SimpleHttpClient(object):
def request(self, method, uri, *args, **kwargs):
# A small wrapper around self.agent.request() so we can easily attach
# counters to it
- outgoing_requests_counter.inc(method)
+ outgoing_requests_counter.labels(method).inc()
- def send_request():
+ logger.info("Sending request %s %s", method, uri)
+
+ try:
request_deferred = self.agent.request(
method, uri, *args, **kwargs
)
-
- return self.clock.time_bound_deferred(
+ add_timeout_to_deferred(
request_deferred,
- time_out=60,
+ 60, cancelled_to_request_timed_out_error,
)
+ response = yield make_deferred_yieldable(request_deferred)
- logger.info("Sending request %s %s", method, uri)
-
- try:
- with logcontext.PreserveLoggingContext():
- response = yield send_request()
-
- incoming_responses_counter.inc(method, response.code)
+ incoming_responses_counter.labels(method, response.code).inc()
logger.info(
"Received response to %s %s: %s",
method, uri, response.code
)
defer.returnValue(response)
except Exception as e:
- incoming_responses_counter.inc(method, "ERR")
+ incoming_responses_counter.labels(method, "ERR").inc()
logger.info(
"Error sending request to %s %s: %s %s",
method, uri, type(e).__name__, e.message
@@ -509,7 +500,7 @@ class SpiderHttpClient(SimpleHttpClient):
reactor,
SpiderEndpointFactory(hs)
)
- ), [('gzip', GzipDecoder)]
+ ), [(b'gzip', GzipDecoder)]
)
# We could look like Chrome:
# self.user_agent = ("Mozilla/5.0 (%s) (KHTML, like Gecko)
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87639b9151..87a482650d 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -12,8 +12,6 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import socket
-
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
from twisted.internet import defer, reactor
from twisted.internet.error import ConnectError
@@ -33,7 +31,7 @@ SERVER_CACHE = {}
# our record of an individual server which can be tried to reach a destination.
#
-# "host" is actually a dotted-quad or ipv6 address string. Except when there's
+# "host" is the hostname acquired from the SRV record. Except when there's
# no SRV record, in which case it is the original hostname.
_Server = collections.namedtuple(
"_Server", "priority weight host port expires"
@@ -117,10 +115,15 @@ class _WrappedConnection(object):
if time.time() - self.last_request >= 2.5 * 60:
self.abort()
# Abort the underlying TLS connection. The abort() method calls
- # loseConnection() on the underlying TLS connection which tries to
+ # loseConnection() on the TLS connection which tries to
# shutdown the connection cleanly. We call abortConnection()
- # since that will promptly close the underlying TCP connection.
- self.transport.abortConnection()
+ # since that will promptly close the TLS connection.
+ #
+ # In Twisted >18.4; the TLS connection will be None if it has closed
+ # which will make abortConnection() throw. Check that the TLS connection
+ # is not None before trying to close it.
+ if self.transport.getHandle() is not None:
+ self.transport.abortConnection()
def request(self, request):
self.last_request = time.time()
@@ -288,7 +291,7 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
if (len(answers) == 1
and answers[0].type == dns.SRV
and answers[0].payload
- and answers[0].payload.target == dns.Name('.')):
+ and answers[0].payload.target == dns.Name(b'.')):
raise ConnectError("Service %s unavailable" % service_name)
for answer in answers:
@@ -297,20 +300,13 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
payload = answer.payload
- hosts = yield _get_hosts_for_srv_record(
- dns_client, str(payload.target)
- )
-
- for (ip, ttl) in hosts:
- host_ttl = min(answer.ttl, ttl)
-
- servers.append(_Server(
- host=ip,
- port=int(payload.port),
- priority=int(payload.priority),
- weight=int(payload.weight),
- expires=int(clock.time()) + host_ttl,
- ))
+ servers.append(_Server(
+ host=str(payload.target),
+ port=int(payload.port),
+ priority=int(payload.priority),
+ weight=int(payload.weight),
+ expires=int(clock.time()) + answer.ttl,
+ ))
servers.sort()
cache[service_name] = list(servers)
@@ -328,81 +324,3 @@ def resolve_service(service_name, dns_client=client, cache=SERVER_CACHE, clock=t
raise e
defer.returnValue(servers)
-
-
-@defer.inlineCallbacks
-def _get_hosts_for_srv_record(dns_client, host):
- """Look up each of the hosts in a SRV record
-
- Args:
- dns_client (twisted.names.dns.IResolver):
- host (basestring): host to look up
-
- Returns:
- Deferred[list[(str, int)]]: a list of (host, ttl) pairs
-
- """
- ip4_servers = []
- ip6_servers = []
-
- def cb(res):
- # lookupAddress and lookupIP6Address return a three-tuple
- # giving the answer, authority, and additional sections of the
- # response.
- #
- # we only care about the answers.
-
- return res[0]
-
- def eb(res, record_type):
- if res.check(DNSNameError):
- return []
- logger.warn("Error looking up %s for %s: %s", record_type, host, res)
- return res
-
- # no logcontexts here, so we can safely fire these off and gatherResults
- d1 = dns_client.lookupAddress(host).addCallbacks(
- cb, eb, errbackArgs=("A", ))
- d2 = dns_client.lookupIPV6Address(host).addCallbacks(
- cb, eb, errbackArgs=("AAAA", ))
- results = yield defer.DeferredList(
- [d1, d2], consumeErrors=True)
-
- # if all of the lookups failed, raise an exception rather than blowing out
- # the cache with an empty result.
- if results and all(s == defer.FAILURE for (s, _) in results):
- defer.returnValue(results[0][1])
-
- for (success, result) in results:
- if success == defer.FAILURE:
- continue
-
- for answer in result:
- if not answer.payload:
- continue
-
- try:
- if answer.type == dns.A:
- ip = answer.payload.dottedQuad()
- ip4_servers.append((ip, answer.ttl))
- elif answer.type == dns.AAAA:
- ip = socket.inet_ntop(
- socket.AF_INET6, answer.payload.address,
- )
- ip6_servers.append((ip, answer.ttl))
- else:
- # the most likely candidate here is a CNAME record.
- # rfc2782 says srvs may not point to aliases.
- logger.warn(
- "Ignoring unexpected DNS record type %s for %s",
- answer.type, host,
- )
- continue
- except Exception as e:
- logger.warn("Ignoring invalid DNS response for %s: %s",
- host, e)
- continue
-
- # keep the ipv4 results before the ipv6 results, mostly to match historical
- # behaviour.
- defer.returnValue(ip4_servers + ip6_servers)
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 9145405cb0..821aed362b 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,17 +13,19 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-import synapse.util.retryutils
from twisted.internet import defer, reactor, protocol
from twisted.internet.error import DNSLookupError
from twisted.web.client import readBody, HTTPConnectionPool, Agent
from twisted.web.http_headers import Headers
from twisted.web._newclient import ResponseDone
+from synapse.http import cancelled_to_request_timed_out_error
from synapse.http.endpoint import matrix_federation_endpoint
-from synapse.util.async import sleep
-from synapse.util import logcontext
import synapse.metrics
+from synapse.util.async import sleep, add_timeout_to_deferred
+from synapse.util import logcontext
+from synapse.util.logcontext import make_deferred_yieldable
+import synapse.util.retryutils
from canonicaljson import encode_canonical_json
@@ -38,22 +41,19 @@ import logging
import random
import sys
import urllib
-import urlparse
+from six.moves.urllib import parse as urlparse
+from six import string_types
+
+from prometheus_client import Counter
logger = logging.getLogger(__name__)
outbound_logger = logging.getLogger("synapse.http.outbound")
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-outgoing_requests_counter = metrics.register_counter(
- "requests",
- labels=["method"],
-)
-incoming_responses_counter = metrics.register_counter(
- "responses",
- labels=["method", "code"],
-)
+outgoing_requests_counter = Counter("synapse_http_matrixfederationclient_requests",
+ "", ["method"])
+incoming_responses_counter = Counter("synapse_http_matrixfederationclient_responses",
+ "", ["method", "code"])
MAX_LONG_RETRIES = 10
@@ -184,21 +184,20 @@ class MatrixFederationHttpClient(object):
producer = body_callback(method, http_url_bytes, headers_dict)
try:
- def send_request():
- request_deferred = self.agent.request(
- method,
- url_bytes,
- Headers(headers_dict),
- producer
- )
-
- return self.clock.time_bound_deferred(
- request_deferred,
- time_out=timeout / 1000. if timeout else 60,
- )
-
- with logcontext.PreserveLoggingContext():
- response = yield send_request()
+ request_deferred = self.agent.request(
+ method,
+ url_bytes,
+ Headers(headers_dict),
+ producer
+ )
+ add_timeout_to_deferred(
+ request_deferred,
+ timeout / 1000. if timeout else 60,
+ cancelled_to_request_timed_out_error,
+ )
+ response = yield make_deferred_yieldable(
+ request_deferred,
+ )
log_result = "%d %s" % (response.code, response.phrase,)
break
@@ -286,7 +285,8 @@ class MatrixFederationHttpClient(object):
headers_dict[b"Authorization"] = auth_headers
@defer.inlineCallbacks
- def put_json(self, destination, path, data={}, json_data_callback=None,
+ def put_json(self, destination, path, args={}, data={},
+ json_data_callback=None,
long_retries=False, timeout=None,
ignore_backoff=False,
backoff_on_404=False):
@@ -296,6 +296,7 @@ class MatrixFederationHttpClient(object):
destination (str): The remote server to send the HTTP request
to.
path (str): The HTTP path.
+ args (dict): query params
data (dict): A dict containing the data that will be used as
the request body. This will be encoded as JSON.
json_data_callback (callable): A callable returning the dict to
@@ -342,6 +343,7 @@ class MatrixFederationHttpClient(object):
path,
body_callback=body_callback,
headers_dict={"Content-Type": ["application/json"]},
+ query_bytes=encode_query_args(args),
long_retries=long_retries,
timeout=timeout,
ignore_backoff=ignore_backoff,
@@ -373,6 +375,7 @@ class MatrixFederationHttpClient(object):
giving up. None indicates no timeout.
ignore_backoff (bool): true to ignore the historical backoff data and
try the request anyway.
+ args (dict): query params
Returns:
Deferred: Succeeds when we get a 2xx HTTP response. The result
will be the decoded JSON body.
@@ -548,7 +551,7 @@ class MatrixFederationHttpClient(object):
encoded_args = {}
for k, vs in args.items():
- if isinstance(vs, basestring):
+ if isinstance(vs, string_types):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
@@ -663,7 +666,7 @@ def check_content_type_is_json(headers):
RuntimeError if the
"""
- c_type = headers.getRawHeaders("Content-Type")
+ c_type = headers.getRawHeaders(b"Content-Type")
if c_type is None:
raise RuntimeError(
"No Content-Type header"
@@ -680,7 +683,7 @@ def check_content_type_is_json(headers):
def encode_query_args(args):
encoded_args = {}
for k, vs in args.items():
- if isinstance(vs, basestring):
+ if isinstance(vs, string_types):
vs = [vs]
encoded_args[k] = [v.encode("UTF-8") for v in vs]
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
new file mode 100644
index 0000000000..dc06f6c443
--- /dev/null
+++ b/synapse/http/request_metrics.py
@@ -0,0 +1,273 @@
+# -*- coding: utf-8 -*-
+# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from prometheus_client.core import Counter, Histogram
+from synapse.metrics import LaterGauge
+
+from synapse.util.logcontext import LoggingContext
+
+logger = logging.getLogger(__name__)
+
+
+# total number of responses served, split by method/servlet/tag
+response_count = Counter(
+ "synapse_http_server_response_count", "", ["method", "servlet", "tag"]
+)
+
+requests_counter = Counter(
+ "synapse_http_server_requests_received", "", ["method", "servlet"]
+)
+
+outgoing_responses_counter = Counter(
+ "synapse_http_server_responses", "", ["method", "code"]
+)
+
+response_timer = Histogram(
+ "synapse_http_server_response_time_seconds", "sec", ["method", "servlet", "tag"]
+)
+
+response_ru_utime = Counter(
+ "synapse_http_server_response_ru_utime_seconds", "sec", ["method", "servlet", "tag"]
+)
+
+response_ru_stime = Counter(
+ "synapse_http_server_response_ru_stime_seconds", "sec", ["method", "servlet", "tag"]
+)
+
+response_db_txn_count = Counter(
+ "synapse_http_server_response_db_txn_count", "", ["method", "servlet", "tag"]
+)
+
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
+response_db_txn_duration = Counter(
+ "synapse_http_server_response_db_txn_duration_seconds",
+ "",
+ ["method", "servlet", "tag"],
+)
+
+# seconds spent waiting for a db connection, when processing this request
+response_db_sched_duration = Counter(
+ "synapse_http_server_response_db_sched_duration_seconds",
+ "",
+ ["method", "servlet", "tag"],
+)
+
+# size in bytes of the response written
+response_size = Counter(
+ "synapse_http_server_response_size", "", ["method", "servlet", "tag"]
+)
+
+# In flight metrics are incremented while the requests are in flight, rather
+# than when the response was written.
+
+in_flight_requests_ru_utime = Counter(
+ "synapse_http_server_in_flight_requests_ru_utime_seconds",
+ "",
+ ["method", "servlet"],
+)
+
+in_flight_requests_ru_stime = Counter(
+ "synapse_http_server_in_flight_requests_ru_stime_seconds",
+ "",
+ ["method", "servlet"],
+)
+
+in_flight_requests_db_txn_count = Counter(
+ "synapse_http_server_in_flight_requests_db_txn_count", "", ["method", "servlet"]
+)
+
+# seconds spent waiting for db txns, excluding scheduling time, when processing
+# this request
+in_flight_requests_db_txn_duration = Counter(
+ "synapse_http_server_in_flight_requests_db_txn_duration_seconds",
+ "",
+ ["method", "servlet"],
+)
+
+# seconds spent waiting for a db connection, when processing this request
+in_flight_requests_db_sched_duration = Counter(
+ "synapse_http_server_in_flight_requests_db_sched_duration_seconds",
+ "",
+ ["method", "servlet"],
+)
+
+# The set of all in flight requests, set[RequestMetrics]
+_in_flight_requests = set()
+
+
+def _get_in_flight_counts():
+ """Returns a count of all in flight requests by (method, server_name)
+
+ Returns:
+ dict[tuple[str, str], int]
+ """
+ for rm in _in_flight_requests:
+ rm.update_metrics()
+
+ # Map from (method, name) -> int, the number of in flight requests of that
+ # type
+ counts = {}
+ for rm in _in_flight_requests:
+ key = (rm.method, rm.name,)
+ counts[key] = counts.get(key, 0) + 1
+
+ return counts
+
+
+LaterGauge(
+ "synapse_http_request_metrics_in_flight_requests_count",
+ "",
+ ["method", "servlet"],
+ _get_in_flight_counts,
+)
+
+
+class RequestMetrics(object):
+ def start(self, time_sec, name, method):
+ self.start = time_sec
+ self.start_context = LoggingContext.current_context()
+ self.name = name
+ self.method = method
+
+ self._request_stats = _RequestStats.from_context(self.start_context)
+
+ _in_flight_requests.add(self)
+
+ def stop(self, time_sec, request):
+ _in_flight_requests.discard(self)
+
+ context = LoggingContext.current_context()
+
+ tag = ""
+ if context:
+ tag = context.tag
+
+ if context != self.start_context:
+ logger.warn(
+ "Context have unexpectedly changed %r, %r",
+ context, self.start_context
+ )
+ return
+
+ outgoing_responses_counter.labels(request.method, str(request.code)).inc()
+
+ response_count.labels(request.method, self.name, tag).inc()
+
+ response_timer.labels(request.method, self.name, tag).observe(
+ time_sec - self.start
+ )
+
+ ru_utime, ru_stime = context.get_resource_usage()
+
+ response_ru_utime.labels(request.method, self.name, tag).inc(ru_utime)
+ response_ru_stime.labels(request.method, self.name, tag).inc(ru_stime)
+ response_db_txn_count.labels(request.method, self.name, tag).inc(
+ context.db_txn_count
+ )
+ response_db_txn_duration.labels(request.method, self.name, tag).inc(
+ context.db_txn_duration_sec
+ )
+ response_db_sched_duration.labels(request.method, self.name, tag).inc(
+ context.db_sched_duration_sec
+ )
+
+ response_size.labels(request.method, self.name, tag).inc(request.sentLength)
+
+ # We always call this at the end to ensure that we update the metrics
+ # regardless of whether a call to /metrics while the request was in
+ # flight.
+ self.update_metrics()
+
+ def update_metrics(self):
+ """Updates the in flight metrics with values from this request.
+ """
+ diff = self._request_stats.update(self.start_context)
+
+ in_flight_requests_ru_utime.labels(self.method, self.name).inc(diff.ru_utime)
+ in_flight_requests_ru_stime.labels(self.method, self.name).inc(diff.ru_stime)
+
+ in_flight_requests_db_txn_count.labels(self.method, self.name).inc(
+ diff.db_txn_count
+ )
+
+ in_flight_requests_db_txn_duration.labels(self.method, self.name).inc(
+ diff.db_txn_duration_sec
+ )
+
+ in_flight_requests_db_sched_duration.labels(self.method, self.name).inc(
+ diff.db_sched_duration_sec
+ )
+
+
+class _RequestStats(object):
+ """Keeps tracks of various metrics for an in flight request.
+ """
+
+ __slots__ = [
+ "ru_utime",
+ "ru_stime",
+ "db_txn_count",
+ "db_txn_duration_sec",
+ "db_sched_duration_sec",
+ ]
+
+ def __init__(
+ self, ru_utime, ru_stime, db_txn_count, db_txn_duration_sec, db_sched_duration_sec
+ ):
+ self.ru_utime = ru_utime
+ self.ru_stime = ru_stime
+ self.db_txn_count = db_txn_count
+ self.db_txn_duration_sec = db_txn_duration_sec
+ self.db_sched_duration_sec = db_sched_duration_sec
+
+ @staticmethod
+ def from_context(context):
+ ru_utime, ru_stime = context.get_resource_usage()
+
+ return _RequestStats(
+ ru_utime, ru_stime,
+ context.db_txn_count,
+ context.db_txn_duration_sec,
+ context.db_sched_duration_sec,
+ )
+
+ def update(self, context):
+ """Updates the current values and returns the difference between the
+ old and new values.
+
+ Returns:
+ _RequestStats: The difference between the old and new values
+ """
+ new = _RequestStats.from_context(context)
+
+ diff = _RequestStats(
+ new.ru_utime - self.ru_utime,
+ new.ru_stime - self.ru_stime,
+ new.db_txn_count - self.db_txn_count,
+ new.db_txn_duration_sec - self.db_txn_duration_sec,
+ new.db_sched_duration_sec - self.db_sched_duration_sec,
+ )
+
+ self.ru_utime = new.ru_utime
+ self.ru_stime = new.ru_stime
+ self.db_txn_count = new.db_txn_count
+ self.db_txn_duration_sec = new.db_txn_duration_sec
+ self.db_sched_duration_sec = new.db_sched_duration_sec
+
+ return diff
diff --git a/synapse/http/server.py b/synapse/http/server.py
index e64aa92729..bc09b8b2be 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -13,11 +13,15 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import cgi
+from six.moves import http_client
from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError, Codes
)
+from synapse.http.request_metrics import (
+ requests_counter,
+)
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
from synapse.util.caches import intern_dict
from synapse.util.metrics import Measure
@@ -41,173 +45,174 @@ import simplejson
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-# total number of responses served, split by method/servlet/tag
-response_count = metrics.register_counter(
- "response_count",
- labels=["method", "servlet", "tag"],
- alternative_names=(
- # the following are all deprecated aliases for the same metric
- metrics.name_prefix + x for x in (
- "_requests",
- "_response_time:count",
- "_response_ru_utime:count",
- "_response_ru_stime:count",
- "_response_db_txn_count:count",
- "_response_db_txn_duration:count",
- )
- )
-)
+HTML_ERROR_TEMPLATE = """<!DOCTYPE html>
+<html lang=en>
+ <head>
+ <meta charset="utf-8">
+ <title>Error {code}</title>
+ </head>
+ <body>
+ <p>{msg}</p>
+ </body>
+</html>
+"""
-requests_counter = metrics.register_counter(
- "requests_received",
- labels=["method", "servlet", ],
-)
-outgoing_responses_counter = metrics.register_counter(
- "responses",
- labels=["method", "code"],
-)
+def wrap_json_request_handler(h):
+ """Wraps a request handler method with exception handling.
-response_timer = metrics.register_counter(
- "response_time_seconds",
- labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_time:total",
- ),
-)
+ Also adds logging as per wrap_request_handler_with_logging.
-response_ru_utime = metrics.register_counter(
- "response_ru_utime_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_ru_utime:total",
- ),
-)
+ The handler method must have a signature of "handle_foo(self, request)",
+ where "self" must have a "clock" attribute (and "request" must be a
+ SynapseRequest).
-response_ru_stime = metrics.register_counter(
- "response_ru_stime_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_ru_stime:total",
- ),
-)
+ The handler must return a deferred. If the deferred succeeds we assume that
+ a response has been sent. If the deferred fails with a SynapseError we use
+ it to send a JSON response with the appropriate HTTP reponse code. If the
+ deferred fails with any other type of error we send a 500 reponse.
+ """
-response_db_txn_count = metrics.register_counter(
- "response_db_txn_count", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_db_txn_count:total",
- ),
-)
+ @defer.inlineCallbacks
+ def wrapped_request_handler(self, request):
+ try:
+ yield h(self, request)
+ except CodeMessageException as e:
+ code = e.code
+ if isinstance(e, SynapseError):
+ logger.info(
+ "%s SynapseError: %s - %s", request, code, e.msg
+ )
+ else:
+ logger.exception(e)
+ respond_with_json(
+ request, code, cs_exception(e), send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
-# seconds spent waiting for db txns, excluding scheduling time, when processing
-# this request
-response_db_txn_duration = metrics.register_counter(
- "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"],
- alternative_names=(
- metrics.name_prefix + "_response_db_txn_duration:total",
- ),
-)
+ except Exception:
+ # failure.Failure() fishes the original Failure out
+ # of our stack, and thus gives us a sensible stack
+ # trace.
+ f = failure.Failure()
+ logger.error(
+ "Failed handle request via %r: %r: %s",
+ h,
+ request,
+ f.getTraceback().rstrip(),
+ )
+ respond_with_json(
+ request,
+ 500,
+ {
+ "error": "Internal server error",
+ "errcode": Codes.UNKNOWN,
+ },
+ send_cors=True,
+ pretty_print=_request_user_agent_is_curl(request),
+ )
-# seconds spent waiting for a db connection, when processing this request
-response_db_sched_duration = metrics.register_counter(
- "response_db_sched_duration_seconds", labels=["method", "servlet", "tag"]
-)
+ return wrap_request_handler_with_logging(wrapped_request_handler)
-_next_request_id = 0
+def wrap_html_request_handler(h):
+ """Wraps a request handler method with exception handling.
-def request_handler(include_metrics=False):
- """Decorator for ``wrap_request_handler``"""
- return lambda request_handler: wrap_request_handler(request_handler, include_metrics)
+ Also adds logging as per wrap_request_handler_with_logging.
+ The handler method must have a signature of "handle_foo(self, request)",
+ where "self" must have a "clock" attribute (and "request" must be a
+ SynapseRequest).
+ """
+ def wrapped_request_handler(self, request):
+ d = defer.maybeDeferred(h, self, request)
+ d.addErrback(_return_html_error, request)
+ return d
-def wrap_request_handler(request_handler, include_metrics=False):
- """Wraps a method that acts as a request handler with the necessary logging
- and exception handling.
+ return wrap_request_handler_with_logging(wrapped_request_handler)
- The method must have a signature of "handle_foo(self, request)". The
- argument "self" must have "version_string" and "clock" attributes. The
- argument "request" must be a twisted HTTP request.
- The method must return a deferred. If the deferred succeeds we assume that
- a response has been sent. If the deferred fails with a SynapseError we use
- it to send a JSON response with the appropriate HTTP reponse code. If the
- deferred fails with any other type of error we send a 500 reponse.
+def _return_html_error(f, request):
+ """Sends an HTML error page corresponding to the given failure
- We insert a unique request-id into the logging context for this request and
- log the response and duration for this request.
+ Args:
+ f (twisted.python.failure.Failure):
+ request (twisted.web.iweb.IRequest):
"""
+ if f.check(CodeMessageException):
+ cme = f.value
+ code = cme.code
+ msg = cme.msg
+
+ if isinstance(cme, SynapseError):
+ logger.info(
+ "%s SynapseError: %s - %s", request, code, msg
+ )
+ else:
+ logger.error(
+ "Failed handle request %r: %s",
+ request,
+ f.getTraceback().rstrip(),
+ )
+ else:
+ code = http_client.INTERNAL_SERVER_ERROR
+ msg = "Internal server error"
+
+ logger.error(
+ "Failed handle request %r: %s",
+ request,
+ f.getTraceback().rstrip(),
+ )
+
+ body = HTML_ERROR_TEMPLATE.format(
+ code=code, msg=cgi.escape(msg),
+ ).encode("utf-8")
+ request.setResponseCode(code)
+ request.setHeader(b"Content-Type", b"text/html; charset=utf-8")
+ request.setHeader(b"Content-Length", b"%i" % (len(body),))
+ request.write(body)
+ finish_request(request)
+
+
+def wrap_request_handler_with_logging(h):
+ """Wraps a request handler to provide logging and metrics
+ The handler method must have a signature of "handle_foo(self, request)",
+ where "self" must have a "clock" attribute (and "request" must be a
+ SynapseRequest).
+
+ As well as calling `request.processing` (which will log the response and
+ duration for this request), the wrapped request handler will insert the
+ request id into the logging context.
+ """
@defer.inlineCallbacks
def wrapped_request_handler(self, request):
- global _next_request_id
- request_id = "%s-%s" % (request.method, _next_request_id)
- _next_request_id += 1
+ """
+ Args:
+ self:
+ request (synapse.http.site.SynapseRequest):
+ """
+ request_id = request.get_request_id()
with LoggingContext(request_id) as request_context:
+ request_context.request = request_id
with Measure(self.clock, "wrapped_request_handler"):
- request_metrics = RequestMetrics()
# we start the request metrics timer here with an initial stab
# at the servlet name. For most requests that name will be
# JsonResource (or a subclass), and JsonResource._async_render
# will update it once it picks a servlet.
servlet_name = self.__class__.__name__
- request_metrics.start(self.clock, name=servlet_name)
-
- request_context.request = request_id
- with request.processing():
- try:
- with PreserveLoggingContext(request_context):
- if include_metrics:
- yield request_handler(self, request, request_metrics)
- else:
- requests_counter.inc(request.method, servlet_name)
- yield request_handler(self, request)
- except CodeMessageException as e:
- code = e.code
- if isinstance(e, SynapseError):
- logger.info(
- "%s SynapseError: %s - %s", request, code, e.msg
- )
- else:
- logger.exception(e)
- outgoing_responses_counter.inc(request.method, str(code))
- respond_with_json(
- request, code, cs_exception(e), send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- except Exception:
- # failure.Failure() fishes the original Failure out
- # of our stack, and thus gives us a sensible stack
- # trace.
- f = failure.Failure()
- logger.error(
- "Failed handle request %s.%s on %r: %r: %s",
- request_handler.__module__,
- request_handler.__name__,
- self,
- request,
- f.getTraceback().rstrip(),
- )
- respond_with_json(
- request,
- 500,
- {
- "error": "Internal server error",
- "errcode": Codes.UNKNOWN,
- },
- send_cors=True,
- pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
- )
- finally:
- try:
- request_metrics.stop(
- self.clock, request
- )
- except Exception as e:
- logger.warn("Failed to stop metrics: %r", e)
+ with request.processing(servlet_name):
+ with PreserveLoggingContext(request_context):
+ d = defer.maybeDeferred(h, self, request)
+
+ # record the arrival of the request *after*
+ # dispatching to the handler, so that the handler
+ # can update the servlet name in the request
+ # metrics
+ requests_counter.labels(request.method,
+ request.request_metrics.name).inc()
+ yield d
return wrapped_request_handler
@@ -257,7 +262,6 @@ class JsonResource(HttpServer, resource.Resource):
self.canonical_json = canonical_json
self.clock = hs.get_clock()
self.path_regexs = {}
- self.version_string = hs.version_string
self.hs = hs
def register_paths(self, method, path_patterns, callback):
@@ -273,13 +277,9 @@ class JsonResource(HttpServer, resource.Resource):
self._async_render(request)
return server.NOT_DONE_YET
- # Disable metric reporting because _async_render does its own metrics.
- # It does its own metric reporting because _async_render dispatches to
- # a callback and it's the class name of that callback we want to report
- # against rather than the JsonResource itself.
- @request_handler(include_metrics=True)
+ @wrap_json_request_handler
@defer.inlineCallbacks
- def _async_render(self, request, request_metrics):
+ def _async_render(self, request):
""" This gets called from render() every time someone sends us a request.
This checks if anyone has registered a callback for that method and
path.
@@ -291,9 +291,7 @@ class JsonResource(HttpServer, resource.Resource):
servlet_classname = servlet_instance.__class__.__name__
else:
servlet_classname = "%r" % callback
-
- request_metrics.name = servlet_classname
- requests_counter.inc(request.method, servlet_classname)
+ request.request_metrics.name = servlet_classname
# Now trigger the callback. If it returns a response, we send it
# here. If it throws an exception, that is handled by the wrapper
@@ -324,7 +322,7 @@ class JsonResource(HttpServer, resource.Resource):
register_paths, so will return (possibly via Deferred) either
None, or a tuple of (http code, response body).
"""
- if request.method == "OPTIONS":
+ if request.method == b"OPTIONS":
return _options_handler, {}
# Loop through all the registered callbacks to check if the method
@@ -340,15 +338,12 @@ class JsonResource(HttpServer, resource.Resource):
def _send_response(self, request, code, response_json_object,
response_code_message=None):
- outgoing_responses_counter.inc(request.method, str(code))
-
# TODO: Only enable CORS for the requests that need it.
respond_with_json(
request, code, response_json_object,
send_cors=True,
response_code_message=response_code_message,
pretty_print=_request_user_agent_is_curl(request),
- version_string=self.version_string,
canonical_json=self.canonical_json,
)
@@ -381,52 +376,6 @@ def _unrecognised_request_handler(request):
raise UnrecognizedRequestError()
-class RequestMetrics(object):
- def start(self, clock, name):
- self.start = clock.time_msec()
- self.start_context = LoggingContext.current_context()
- self.name = name
-
- def stop(self, clock, request):
- context = LoggingContext.current_context()
-
- tag = ""
- if context:
- tag = context.tag
-
- if context != self.start_context:
- logger.warn(
- "Context have unexpectedly changed %r, %r",
- context, self.start_context
- )
- return
-
- response_count.inc(request.method, self.name, tag)
-
- response_timer.inc_by(
- clock.time_msec() - self.start, request.method,
- self.name, tag
- )
-
- ru_utime, ru_stime = context.get_resource_usage()
-
- response_ru_utime.inc_by(
- ru_utime, request.method, self.name, tag
- )
- response_ru_stime.inc_by(
- ru_stime, request.method, self.name, tag
- )
- response_db_txn_count.inc_by(
- context.db_txn_count, request.method, self.name, tag
- )
- response_db_txn_duration.inc_by(
- context.db_txn_duration_ms / 1000., request.method, self.name, tag
- )
- response_db_sched_duration.inc_by(
- context.db_sched_duration_ms / 1000., request.method, self.name, tag
- )
-
-
class RootRedirect(resource.Resource):
"""Redirects the root '/' path to another path."""
@@ -445,7 +394,7 @@ class RootRedirect(resource.Resource):
def respond_with_json(request, code, json_object, send_cors=False,
response_code_message=None, pretty_print=False,
- version_string="", canonical_json=True):
+ canonical_json=True):
# could alternatively use request.notifyFinish() and flip a flag when
# the Deferred fires, but since the flag is RIGHT THERE it seems like
# a waste.
@@ -461,19 +410,17 @@ def respond_with_json(request, code, json_object, send_cors=False,
if canonical_json or synapse.events.USE_FROZEN_DICTS:
json_bytes = encode_canonical_json(json_object)
else:
- # ujson doesn't like frozen_dicts.
json_bytes = simplejson.dumps(json_object)
return respond_with_json_bytes(
request, code, json_bytes,
send_cors=send_cors,
response_code_message=response_code_message,
- version_string=version_string
)
def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
- version_string="", response_code_message=None):
+ response_code_message=None):
"""Sends encoded JSON in response to the given request.
Args:
@@ -487,8 +434,8 @@ def respond_with_json_bytes(request, code, json_bytes, send_cors=False,
request.setResponseCode(code, message=response_code_message)
request.setHeader(b"Content-Type", b"application/json")
- request.setHeader(b"Server", version_string)
request.setHeader(b"Content-Length", b"%d" % (len(json_bytes),))
+ request.setHeader(b"Cache-Control", b"no-cache, no-store, must-revalidate")
if send_cors:
set_cors_headers(request)
@@ -536,9 +483,9 @@ def finish_request(request):
def _request_user_agent_is_curl(request):
user_agents = request.requestHeaders.getRawHeaders(
- "User-Agent", default=[]
+ b"User-Agent", default=[]
)
for user_agent in user_agents:
- if "curl" in user_agent:
+ if b"curl" in user_agent:
return True
return False
diff --git a/synapse/http/site.py b/synapse/http/site.py
index e422c8dfae..60299657b9 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -12,27 +12,51 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.util.logcontext import LoggingContext
-from twisted.web.server import Site, Request
-
import contextlib
import logging
import re
import time
-ACCESS_TOKEN_RE = re.compile(r'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+from twisted.web.server import Site, Request
+
+from synapse.http.request_metrics import RequestMetrics
+from synapse.util.logcontext import LoggingContext
+
+logger = logging.getLogger(__name__)
+
+ACCESS_TOKEN_RE = re.compile(br'(\?.*access(_|%5[Ff])token=)[^&]*(.*)$')
+
+_next_request_seq = 0
class SynapseRequest(Request):
+ """Class which encapsulates an HTTP request to synapse.
+
+ All of the requests processed in synapse are of this type.
+
+ It extends twisted's twisted.web.server.Request, and adds:
+ * Unique request ID
+ * Redaction of access_token query-params in __repr__
+ * Logging at start and end
+ * Metrics to record CPU, wallclock and DB time by endpoint.
+
+ It provides a method `processing` which should be called by the Resource
+ which is handling the request, and returns a context manager.
+
+ """
def __init__(self, site, *args, **kw):
Request.__init__(self, *args, **kw)
self.site = site
self.authenticated_entity = None
self.start_time = 0
+ global _next_request_seq
+ self.request_seq = _next_request_seq
+ _next_request_seq += 1
+
def __repr__(self):
# We overwrite this so that we don't log ``access_token``
- return '<%s at 0x%x method=%s uri=%s clientproto=%s site=%s>' % (
+ return '<%s at 0x%x method=%r uri=%r clientproto=%r site=%r>' % (
self.__class__.__name__,
id(self),
self.method,
@@ -41,16 +65,30 @@ class SynapseRequest(Request):
self.site.site_tag,
)
+ def get_request_id(self):
+ return "%s-%i" % (self.method, self.request_seq)
+
def get_redacted_uri(self):
return ACCESS_TOKEN_RE.sub(
- r'\1<redacted>\3',
+ br'\1<redacted>\3',
self.uri
)
def get_user_agent(self):
- return self.requestHeaders.getRawHeaders("User-Agent", [None])[-1]
+ return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
+
+ def render(self, resrc):
+ # override the Server header which is set by twisted
+ self.setHeader("Server", self.site.server_version_string)
+ return Request.render(self, resrc)
+
+ def _started_processing(self, servlet_name):
+ self.start_time = time.time()
+ self.request_metrics = RequestMetrics()
+ self.request_metrics.start(
+ self.start_time, name=servlet_name, method=self.method,
+ )
- def started_processing(self):
self.site.access_logger.info(
"%s - %s - Received request: %s %s",
self.getClientIP(),
@@ -58,32 +96,32 @@ class SynapseRequest(Request):
self.method,
self.get_redacted_uri()
)
- self.start_time = int(time.time() * 1000)
-
- def finished_processing(self):
+ def _finished_processing(self):
try:
context = LoggingContext.current_context()
ru_utime, ru_stime = context.get_resource_usage()
db_txn_count = context.db_txn_count
- db_txn_duration_ms = context.db_txn_duration_ms
- db_sched_duration_ms = context.db_sched_duration_ms
+ db_txn_duration_sec = context.db_txn_duration_sec
+ db_sched_duration_sec = context.db_sched_duration_sec
except Exception:
ru_utime, ru_stime = (0, 0)
- db_txn_count, db_txn_duration_ms = (0, 0)
+ db_txn_count, db_txn_duration_sec = (0, 0)
+
+ end_time = time.time()
self.site.access_logger.info(
"%s - %s - {%s}"
- " Processed request: %dms (%dms, %dms) (%dms/%dms/%d)"
+ " Processed request: %.3fsec (%.3fsec, %.3fsec) (%.3fsec/%.3fsec/%d)"
" %sB %s \"%s %s %s\" \"%s\"",
self.getClientIP(),
self.site.site_tag,
self.authenticated_entity,
- int(time.time() * 1000) - self.start_time,
- int(ru_utime * 1000),
- int(ru_stime * 1000),
- db_sched_duration_ms,
- db_txn_duration_ms,
+ end_time - self.start_time,
+ ru_utime,
+ ru_stime,
+ db_sched_duration_sec,
+ db_txn_duration_sec,
int(db_txn_count),
self.sentLength,
self.code,
@@ -93,11 +131,38 @@ class SynapseRequest(Request):
self.get_user_agent(),
)
+ try:
+ self.request_metrics.stop(end_time, self)
+ except Exception as e:
+ logger.warn("Failed to stop metrics: %r", e)
+
@contextlib.contextmanager
- def processing(self):
- self.started_processing()
+ def processing(self, servlet_name):
+ """Record the fact that we are processing this request.
+
+ Returns a context manager; the correct way to use this is:
+
+ @defer.inlineCallbacks
+ def handle_request(request):
+ with request.processing("FooServlet"):
+ yield really_handle_the_request()
+
+ This will log the request's arrival. Once the context manager is
+ closed, the completion of the request will be logged, and the various
+ metrics will be updated.
+
+ Args:
+ servlet_name (str): the name of the servlet which will be
+ processing this request. This is used in the metrics.
+
+ It is possible to update this afterwards by updating
+ self.request_metrics.servlet_name.
+ """
+ # TODO: we should probably just move this into render() and finish(),
+ # to save having to call a separate method.
+ self._started_processing(servlet_name)
yield
- self.finished_processing()
+ self._finished_processing()
class XForwardedForRequest(SynapseRequest):
@@ -135,7 +200,8 @@ class SynapseSite(Site):
Subclass of a twisted http Site that does access logging with python's
standard logging
"""
- def __init__(self, logger_name, site_tag, config, resource, *args, **kwargs):
+ def __init__(self, logger_name, site_tag, config, resource,
+ server_version_string, *args, **kwargs):
Site.__init__(self, resource, *args, **kwargs)
self.site_tag = site_tag
@@ -143,6 +209,7 @@ class SynapseSite(Site):
proxied = config.get("x_forwarded", False)
self.requestFactory = SynapseRequestFactory(self, proxied)
self.access_logger = logging.getLogger(logger_name)
+ self.server_version_string = server_version_string
def log(self, request):
pass
|