diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3649406efb..aa7c722efc 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -47,6 +47,7 @@ from synapse.crypto import context_factory
from synapse.util.logcontext import LoggingContext
from synapse.rest.client.v1 import ClientV1RestResource
from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
+from synapse.metrics.resource import MetricsResource, METRICS_PREFIX
from daemonize import Daemonize
import twisted.manhole.telnet
@@ -100,6 +101,12 @@ class SynapseHomeServer(HomeServer):
def build_resource_for_server_key(self):
return LocalKey(self)
+ def build_resource_for_metrics(self):
+ if self.get_config().enable_metrics:
+ return MetricsResource(self)
+ else:
+ return None
+
def build_db_pool(self):
return adbapi.ConnectionPool(
"sqlite3", self.get_db_name(),
@@ -110,7 +117,7 @@ class SynapseHomeServer(HomeServer):
# so that :memory: sqlite works
)
- def create_resource_tree(self, web_client, redirect_root_to_web_client):
+ def create_resource_tree(self, redirect_root_to_web_client):
"""Create the resource tree for this Home Server.
This in unduly complicated because Twisted does not support putting
@@ -122,6 +129,9 @@ class SynapseHomeServer(HomeServer):
location of the web client. This does nothing if web_client is not
True.
"""
+ config = self.get_config()
+ web_client = config.webclient
+
# list containing (path_str, Resource) e.g:
# [ ("/aaa/bbb/cc", Resource1), ("/aaa/dummy", Resource2) ]
desired_tree = [
@@ -145,6 +155,10 @@ class SynapseHomeServer(HomeServer):
else:
self.root_resource = Resource()
+ metrics_resource = self.get_resource_for_metrics()
+ if config.metrics_port is None and metrics_resource is not None:
+ desired_tree.append((METRICS_PREFIX, metrics_resource))
+
# ideally we'd just use getChild and putChild but getChild doesn't work
# unless you give it a Request object IN ADDITION to the name :/ So
# instead, we'll store a copy of this mapping so we can actually add
@@ -206,17 +220,27 @@ class SynapseHomeServer(HomeServer):
"""
return "%s-%s" % (resource, path_seg)
- def start_listening(self, secure_port, unsecure_port):
- if secure_port is not None:
+ def start_listening(self):
+ config = self.get_config()
+
+ if not config.no_tls and config.bind_port is not None:
reactor.listenSSL(
- secure_port, Site(self.root_resource), self.tls_context_factory
+ config.bind_port, Site(self.root_resource), self.tls_context_factory
)
- logger.info("Synapse now listening on port %d", secure_port)
- if unsecure_port is not None:
+ logger.info("Synapse now listening on port %d", config.bind_port)
+
+ if config.unsecure_port is not None:
reactor.listenTCP(
- unsecure_port, Site(self.root_resource)
+ config.unsecure_port, Site(self.root_resource)
)
- logger.info("Synapse now listening on port %d", unsecure_port)
+ logger.info("Synapse now listening on port %d", config.unsecure_port)
+
+ metrics_resource = self.get_resource_for_metrics()
+ if metrics_resource and config.metrics_port is not None:
+ reactor.listenTCP(
+ config.metrics_port, Site(metrics_resource), interface="127.0.0.1",
+ )
+ logger.info("Metrics now running on 127.0.0.1 port %d", config.metrics_port)
def get_version_string():
@@ -340,7 +364,6 @@ def setup(config_options):
)
hs.create_resource_tree(
- web_client=config.webclient,
redirect_root_to_web_client=True,
)
@@ -369,11 +392,7 @@ def setup(config_options):
f.namespace['hs'] = hs
reactor.listenTCP(config.manhole, f, interface='127.0.0.1')
- bind_port = config.bind_port
- if config.no_tls:
- bind_port = None
-
- hs.start_listening(bind_port, config.unsecure_port)
+ hs.start_listening()
hs.get_pusherpool().start()
hs.get_state_handler().start_caching()
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index c024535f52..241afdf872 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -23,11 +23,13 @@ from .captcha import CaptchaConfig
from .email import EmailConfig
from .voip import VoipConfig
from .registration import RegistrationConfig
+from .metrics import MetricsConfig
class HomeServerConfig(TlsConfig, ServerConfig, DatabaseConfig, LoggingConfig,
RatelimitConfig, ContentRepositoryConfig, CaptchaConfig,
- EmailConfig, VoipConfig, RegistrationConfig,):
+ EmailConfig, VoipConfig, RegistrationConfig,
+ MetricsConfig,):
pass
diff --git a/synapse/config/metrics.py b/synapse/config/metrics.py
new file mode 100644
index 0000000000..901a429c76
--- /dev/null
+++ b/synapse/config/metrics.py
@@ -0,0 +1,36 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ._base import Config
+
+
+class MetricsConfig(Config):
+ def __init__(self, args):
+ super(MetricsConfig, self).__init__(args)
+ self.enable_metrics = args.enable_metrics
+ self.metrics_port = args.metrics_port
+
+ @classmethod
+ def add_arguments(cls, parser):
+ super(MetricsConfig, cls).add_arguments(parser)
+ metrics_group = parser.add_argument_group("metrics")
+ metrics_group.add_argument(
+ '--enable-metrics', dest="enable_metrics", action="store_true",
+ help="Enable collection and rendering of performance metrics"
+ )
+ metrics_group.add_argument(
+ '--metrics-port', metavar="PORT", type=int,
+ help="Separate port to accept metrics requests on (on localhost)"
+ )
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f131941f45..6811a0e3d1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
from synapse.util.expiringcache import ExpiringCache
from synapse.util.logutils import log_function
from synapse.events import FrozenEvent
+import synapse.metrics
from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
@@ -36,9 +37,17 @@ import random
logger = logging.getLogger(__name__)
+# synapse.federation.federation_client is a silly name
+metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+
+sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
+
+sent_edus_counter = metrics.register_counter("sent_edus")
+
+sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+
+
class FederationClient(FederationBase):
- def __init__(self):
- self._get_pdu_cache = None
def start_get_pdu_cache(self):
self._get_pdu_cache = ExpiringCache(
@@ -68,6 +77,8 @@ class FederationClient(FederationBase):
order = self._order
self._order += 1
+ sent_pdus_destination_dist.inc_by(len(destinations))
+
logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
# TODO, add errback, etc.
@@ -87,6 +98,8 @@ class FederationClient(FederationBase):
content=content,
)
+ sent_edus_counter.inc()
+
# TODO, add errback, etc.
self._transaction_queue.enqueue_edu(edu)
return defer.succeed(None)
@@ -113,6 +126,8 @@ class FederationClient(FederationBase):
a Deferred which will eventually yield a JSON object from the
response
"""
+ sent_queries_counter.inc(query_type)
+
return self.transport_layer.make_query(
destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
)
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9c7dcdba96..25c0014f97 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -22,6 +22,7 @@ from .units import Transaction, Edu
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.events import FrozenEvent
+import synapse.metrics
from synapse.api.errors import FederationError, SynapseError
@@ -32,6 +33,15 @@ import logging
logger = logging.getLogger(__name__)
+# synapse.federation.federation_server is a silly name
+metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
+
+received_pdus_counter = metrics.register_counter("received_pdus")
+
+received_edus_counter = metrics.register_counter("received_edus")
+
+received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
+
class FederationServer(FederationBase):
def set_handler(self, handler):
@@ -84,6 +94,8 @@ class FederationServer(FederationBase):
def on_incoming_transaction(self, transaction_data):
transaction = Transaction(**transaction_data)
+ received_pdus_counter.inc_by(len(transaction.pdus))
+
for p in transaction.pdus:
if "unsigned" in p:
unsigned = p["unsigned"]
@@ -153,6 +165,8 @@ class FederationServer(FederationBase):
defer.returnValue((200, response))
def received_edu(self, origin, edu_type, content):
+ received_edus_counter.inc()
+
if edu_type in self.edu_handlers:
self.edu_handlers[edu_type](origin, content)
else:
@@ -204,6 +218,8 @@ class FederationServer(FederationBase):
@defer.inlineCallbacks
def on_query_request(self, query_type, args):
+ received_queries_counter.inc(query_type)
+
if query_type in self.query_handlers:
response = yield self.query_handlers[query_type](args)
defer.returnValue((200, response))
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9dc7849b17..4dccd93d0e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.retryutils import (
get_retry_limiter, NotRetryingDestination,
)
+import synapse.metrics
import logging
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
class TransactionQueue(object):
"""This class makes sure we only have one transaction in flight at
@@ -54,11 +57,25 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
+ metrics.register_callback(
+ "pending_destinations",
+ lambda: len(self.pending_transactions),
+ )
+
# Is a mapping from destination -> list of
# tuple(pending pdus, deferred, order)
- self.pending_pdus_by_dest = {}
+ self.pending_pdus_by_dest = pdus = {}
# destination -> list of tuple(edu, deferred)
- self.pending_edus_by_dest = {}
+ self.pending_edus_by_dest = edus = {}
+
+ metrics.register_callback(
+ "pending_pdus",
+ lambda: sum(map(len, pdus.values())),
+ )
+ metrics.register_callback(
+ "pending_edus",
+ lambda: sum(map(len, edus.values())),
+ )
# destination -> list of tuple(failure, deferred)
self.pending_failures_by_dest = {}
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 6c624977d7..7838a81362 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -148,6 +148,10 @@ class BaseFederationServlet(object):
logger.exception("authenticate_request failed")
raise
defer.returnValue(response)
+
+ # Extra logic that functools.wraps() doesn't finish
+ new_code.__self__ = code.__self__
+
return new_code
def register(self, server):
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 28e922f79b..731df00648 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -21,6 +21,7 @@ from synapse.api.constants import PresenceState
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import UserID
+import synapse.metrics
from ._base import BaseHandler
@@ -29,6 +30,8 @@ import logging
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
# TODO(paul): Maybe there's one of these I can steal from somewhere
def partition(l, func):
@@ -133,6 +136,11 @@ class PresenceHandler(BaseHandler):
self._user_cachemap = {}
self._user_cachemap_latest_serial = 0
+ metrics.register_callback(
+ "userCachemap:size",
+ lambda: len(self._user_cachemap),
+ )
+
def _get_or_make_usercache(self, user):
"""If the cache entry doesn't exist, initialise a new one."""
if user not in self._user_cachemap:
diff --git a/synapse/http/client.py b/synapse/http/client.py
index b53a07aa2d..2ae1c4d3a4 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,6 +15,7 @@
from synapse.api.errors import CodeMessageException
from syutil.jsonutil import encode_canonical_json
+import synapse.metrics
from twisted.internet import defer, reactor
from twisted.web.client import (
@@ -31,6 +32,17 @@ import urllib
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+outgoing_requests_counter = metrics.register_counter(
+ "requests",
+ labels=["method"],
+)
+incoming_responses_counter = metrics.register_counter(
+ "responses",
+ labels=["method", "code"],
+)
+
class SimpleHttpClient(object):
"""
@@ -45,12 +57,30 @@ class SimpleHttpClient(object):
self.agent = Agent(reactor)
self.version_string = hs.version_string
+ def request(self, method, *args, **kwargs):
+ # A small wrapper around self.agent.request() so we can easily attach
+ # counters to it
+ outgoing_requests_counter.inc(method)
+ d = self.agent.request(method, *args, **kwargs)
+
+ def _cb(response):
+ incoming_responses_counter.inc(method, response.code)
+ return response
+
+ def _eb(failure):
+ incoming_responses_counter.inc(method, "ERR")
+ return failure
+
+ d.addCallbacks(_cb, _eb)
+
+ return d
+
@defer.inlineCallbacks
def post_urlencoded_get_json(self, uri, args={}):
logger.debug("post_urlencoded_get_json args: %s", args)
query_bytes = urllib.urlencode(args, True)
- response = yield self.agent.request(
+ response = yield self.request(
"POST",
uri.encode("ascii"),
headers=Headers({
@@ -70,7 +100,7 @@ class SimpleHttpClient(object):
logger.info("HTTP POST %s -> %s", json_str, uri)
- response = yield self.agent.request(
+ response = yield self.request(
"POST",
uri.encode("ascii"),
headers=Headers({
@@ -104,7 +134,7 @@ class SimpleHttpClient(object):
query_bytes = urllib.urlencode(args, True)
uri = "%s?%s" % (uri, query_bytes)
- response = yield self.agent.request(
+ response = yield self.request(
"GET",
uri.encode("ascii"),
headers=Headers({
@@ -145,7 +175,7 @@ class SimpleHttpClient(object):
json_str = encode_canonical_json(json_body)
- response = yield self.agent.request(
+ response = yield self.request(
"PUT",
uri.encode("ascii"),
headers=Headers({
@@ -176,7 +206,7 @@ class CaptchaServerHttpClient(SimpleHttpClient):
def post_urlencoded_get_raw(self, url, args={}):
query_bytes = urllib.urlencode(args, True)
- response = yield self.agent.request(
+ response = yield self.request(
"POST",
url.encode("ascii"),
bodyProducer=FileBodyProducer(StringIO(query_bytes)),
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 7db001cc63..7fa295cad5 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -23,6 +23,7 @@ from twisted.web._newclient import ResponseDone
from synapse.http.endpoint import matrix_federation_endpoint
from synapse.util.async import sleep
from synapse.util.logcontext import PreserveLoggingContext
+import synapse.metrics
from syutil.jsonutil import encode_canonical_json
@@ -40,6 +41,17 @@ import urlparse
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+outgoing_requests_counter = metrics.register_counter(
+ "requests",
+ labels=["method"],
+)
+incoming_responses_counter = metrics.register_counter(
+ "responses",
+ labels=["method", "code"],
+)
+
class MatrixFederationHttpAgent(_AgentBase):
@@ -49,6 +61,8 @@ class MatrixFederationHttpAgent(_AgentBase):
def request(self, destination, endpoint, method, path, params, query,
headers, body_producer):
+ outgoing_requests_counter.inc(method)
+
host = b""
port = 0
fragment = b""
@@ -59,9 +73,21 @@ class MatrixFederationHttpAgent(_AgentBase):
# Set the connection pool key to be the destination.
key = destination
- return self._requestWithEndpoint(key, endpoint, method, parsed_URI,
- headers, body_producer,
- parsed_URI.originForm)
+ d = self._requestWithEndpoint(key, endpoint, method, parsed_URI,
+ headers, body_producer,
+ parsed_URI.originForm)
+
+ def _cb(response):
+ incoming_responses_counter.inc(method, response.code)
+ return response
+
+ def _eb(failure):
+ incoming_responses_counter.inc(method, "ERR")
+ return failure
+
+ d.addCallbacks(_cb, _eb)
+
+ return d
class MatrixFederationHttpClient(object):
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 767c3ef79b..d77cb77799 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -18,6 +18,7 @@ from synapse.api.errors import (
cs_exception, SynapseError, CodeMessageException, UnrecognizedRequestError
)
from synapse.util.logcontext import LoggingContext
+import synapse.metrics
from syutil.jsonutil import (
encode_canonical_json, encode_pretty_printed_json
@@ -34,6 +35,17 @@ import urllib
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+incoming_requests_counter = metrics.register_counter(
+ "requests",
+ labels=["method", "servlet"],
+)
+outgoing_responses_counter = metrics.register_counter(
+ "responses",
+ labels=["method", "code"],
+)
+
class HttpServer(object):
""" Interface for registering callbacks on a HTTP server
@@ -131,6 +143,15 @@ class JsonResource(HttpServer, resource.Resource):
# returned response. We pass both the request and any
# matched groups from the regex to the callback.
+ callback = path_entry.callback
+
+ servlet_instance = getattr(callback, "__self__", None)
+ if servlet_instance is not None:
+ servlet_classname = servlet_instance.__class__.__name__
+ else:
+ servlet_classname = "%r" % callback
+ incoming_requests_counter.inc(request.method, servlet_classname)
+
args = [
urllib.unquote(u).decode("UTF-8") for u in m.groups()
]
@@ -140,10 +161,7 @@ class JsonResource(HttpServer, resource.Resource):
request.method, request.path
)
- code, response = yield path_entry.callback(
- request,
- *args
- )
+ code, response = yield callback(request, *args)
self._send_response(request, code, response)
return
@@ -190,6 +208,8 @@ class JsonResource(HttpServer, resource.Resource):
request)
return
+ outgoing_responses_counter.inc(request.method, str(code))
+
# TODO: Only enable CORS for the requests that need it.
respond_with_json(
request, code, response_json_object,
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
new file mode 100644
index 0000000000..dffb8a4861
--- /dev/null
+++ b/synapse/metrics/__init__.py
@@ -0,0 +1,111 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# Because otherwise 'resource' collides with synapse.metrics.resource
+from __future__ import absolute_import
+
+import logging
+from resource import getrusage, getpagesize, RUSAGE_SELF
+
+from .metric import (
+ CounterMetric, CallbackMetric, DistributionMetric, CacheMetric
+)
+
+
+logger = logging.getLogger(__name__)
+
+
+# We'll keep all the available metrics in a single toplevel dict, one shared
+# for the entire process. We don't currently support per-HomeServer instances
+# of metrics, because in practice any one python VM will host only one
+# HomeServer anyway. This makes a lot of implementation neater
+all_metrics = {}
+
+
+class Metrics(object):
+ """ A single Metrics object gives a (mutable) slice view of the all_metrics
+ dict, allowing callers to easily register new metrics that are namespaced
+ nicely."""
+
+ def __init__(self, name):
+ self.name_prefix = name
+
+ def _register(self, metric_class, name, *args, **kwargs):
+ full_name = "%s_%s" % (self.name_prefix, name)
+
+ metric = metric_class(full_name, *args, **kwargs)
+
+ all_metrics[full_name] = metric
+ return metric
+
+ def register_counter(self, *args, **kwargs):
+ return self._register(CounterMetric, *args, **kwargs)
+
+ def register_callback(self, *args, **kwargs):
+ return self._register(CallbackMetric, *args, **kwargs)
+
+ def register_distribution(self, *args, **kwargs):
+ return self._register(DistributionMetric, *args, **kwargs)
+
+ def register_cache(self, *args, **kwargs):
+ return self._register(CacheMetric, *args, **kwargs)
+
+
+def get_metrics_for(pkg_name):
+ """ Returns a Metrics instance for conveniently creating metrics
+ namespaced with the given name prefix. """
+
+ # Convert a "package.name" to "package_name" because Prometheus doesn't
+ # let us use . in metric names
+ return Metrics(pkg_name.replace(".", "_"))
+
+
+def render_all():
+ strs = []
+
+ # TODO(paul): Internal hack
+ update_resource_metrics()
+
+ for name in sorted(all_metrics.keys()):
+ try:
+ strs += all_metrics[name].render()
+ except Exception:
+ strs += ["# FAILED to render %s" % name]
+ logger.exception("Failed to render %s metric", name)
+
+ strs.append("") # to generate a final CRLF
+
+ return "\n".join(strs)
+
+
+# Now register some standard process-wide state metrics, to give indications of
+# process resource usage
+
+rusage = None
+PAGE_SIZE = getpagesize()
+
+
+def update_resource_metrics():
+ global rusage
+ rusage = getrusage(RUSAGE_SELF)
+
+resource_metrics = get_metrics_for("process.resource")
+
+# msecs
+resource_metrics.register_callback("utime", lambda: rusage.ru_utime * 1000)
+resource_metrics.register_callback("stime", lambda: rusage.ru_stime * 1000)
+
+# pages
+resource_metrics.register_callback("maxrss", lambda: rusage.ru_maxrss * PAGE_SIZE)
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
new file mode 100644
index 0000000000..21b37748f6
--- /dev/null
+++ b/synapse/metrics/metric.py
@@ -0,0 +1,155 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from itertools import chain
+
+
+# TODO(paul): I can't believe Python doesn't have one of these
+def map_concat(func, items):
+ # flatten a list-of-lists
+ return list(chain.from_iterable(map(func, items)))
+
+
+class BaseMetric(object):
+
+ def __init__(self, name, labels=[]):
+ self.name = name
+ self.labels = labels # OK not to clone as we never write it
+
+ def dimension(self):
+ return len(self.labels)
+
+ def is_scalar(self):
+ return not len(self.labels)
+
+ def _render_labelvalue(self, value):
+ # TODO: some kind of value escape
+ return '"%s"' % (value)
+
+ def _render_key(self, values):
+ if self.is_scalar():
+ return ""
+ return "{%s}" % (
+ ",".join(["%s=%s" % (k, self._render_labelvalue(v))
+ for k, v in zip(self.labels, values)])
+ )
+
+ def render(self):
+ return map_concat(self.render_item, sorted(self.counts.keys()))
+
+
+class CounterMetric(BaseMetric):
+ """The simplest kind of metric; one that stores a monotonically-increasing
+ integer that counts events."""
+
+ def __init__(self, *args, **kwargs):
+ super(CounterMetric, self).__init__(*args, **kwargs)
+
+ self.counts = {}
+
+ # Scalar metrics are never empty
+ if self.is_scalar():
+ self.counts[()] = 0
+
+ def inc_by(self, incr, *values):
+ if len(values) != self.dimension():
+ raise ValueError(
+ "Expected as many values to inc() as labels (%d)" % (self.dimension())
+ )
+
+ # TODO: should assert that the tag values are all strings
+
+ if values not in self.counts:
+ self.counts[values] = incr
+ else:
+ self.counts[values] += incr
+
+ def inc(self, *values):
+ self.inc_by(1, *values)
+
+ def render_item(self, k):
+ return ["%s%s %d" % (self.name, self._render_key(k), self.counts[k])]
+
+
+class CallbackMetric(BaseMetric):
+ """A metric that returns the numeric value returned by a callback whenever
+ it is rendered. Typically this is used to implement gauges that yield the
+ size or other state of some in-memory object by actively querying it."""
+
+ def __init__(self, name, callback, labels=[]):
+ super(CallbackMetric, self).__init__(name, labels=labels)
+
+ self.callback = callback
+
+ def render(self):
+ value = self.callback()
+
+ if self.is_scalar():
+ return ["%s %d" % (self.name, value)]
+
+ return ["%s%s %d" % (self.name, self._render_key(k), value[k])
+ for k in sorted(value.keys())]
+
+
+class DistributionMetric(object):
+ """A combination of an event counter and an accumulator, which counts
+ both the number of events and accumulates the total value. Typically this
+ could be used to keep track of method-running times, or other distributions
+ of values that occur in discrete occurances.
+
+ TODO(paul): Try to export some heatmap-style stats?
+ """
+
+ def __init__(self, name, *args, **kwargs):
+ self.counts = CounterMetric(name + ":count", **kwargs)
+ self.totals = CounterMetric(name + ":total", **kwargs)
+
+ def inc_by(self, inc, *values):
+ self.counts.inc(*values)
+ self.totals.inc_by(inc, *values)
+
+ def render(self):
+ return self.counts.render() + self.totals.render()
+
+
+class CacheMetric(object):
+ """A combination of two CounterMetrics, one to count cache hits and one to
+ count a total, and a callback metric to yield the current size.
+
+ This metric generates standard metric name pairs, so that monitoring rules
+ can easily be applied to measure hit ratio."""
+
+ def __init__(self, name, size_callback, labels=[]):
+ self.name = name
+
+ self.hits = CounterMetric(name + ":hits", labels=labels)
+ self.total = CounterMetric(name + ":total", labels=labels)
+
+ self.size = CallbackMetric(
+ name + ":size",
+ callback=size_callback,
+ labels=labels,
+ )
+
+ def inc_hits(self, *values):
+ self.hits.inc(*values)
+ self.total.inc(*values)
+
+ def inc_misses(self, *values):
+ self.total.inc(*values)
+
+ def render(self):
+ return self.hits.render() + self.total.render() + self.size.render()
diff --git a/synapse/metrics/resource.py b/synapse/metrics/resource.py
new file mode 100644
index 0000000000..0af4b3eb52
--- /dev/null
+++ b/synapse/metrics/resource.py
@@ -0,0 +1,39 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.web.resource import Resource
+
+import synapse.metrics
+
+
+METRICS_PREFIX = "/_synapse/metrics"
+
+
+class MetricsResource(Resource):
+ isLeaf = True
+
+ def __init__(self, hs):
+ Resource.__init__(self) # Resource is old-style, so no super()
+
+ self.hs = hs
+
+ def render_GET(self, request):
+ response = synapse.metrics.render_all()
+
+ request.setHeader("Content-Type", "text/plain")
+ request.setHeader("Content-Length", str(len(response)))
+
+ # Encode as UTF-8 (default)
+ return response.encode()
diff --git a/synapse/notifier.py b/synapse/notifier.py
index df13e8ddb6..7121d659d0 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -19,12 +19,27 @@ from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
from synapse.types import StreamToken
+import synapse.metrics
import logging
logger = logging.getLogger(__name__)
+metrics = synapse.metrics.get_metrics_for(__name__)
+
+notified_events_counter = metrics.register_counter("notified_events")
+
+
+# TODO(paul): Should be shared somewhere
+def count(func, l):
+ """Return the number of items in l for which func returns true."""
+ n = 0
+ for x in l:
+ if func(x):
+ n += 1
+ return n
+
class _NotificationListener(object):
""" This represents a single client connection to the events stream.
@@ -59,6 +74,7 @@ class _NotificationListener(object):
try:
self.deferred.callback(result)
+ notified_events_counter.inc_by(len(events))
except defer.AlreadyCalledError:
pass
@@ -95,6 +111,35 @@ class Notifier(object):
"user_joined_room", self._user_joined_room
)
+ # This is not a very cheap test to perform, but it's only executed
+ # when rendering the metrics page, which is likely once per minute at
+ # most when scraping it.
+ def count_listeners():
+ all_listeners = set()
+
+ for x in self.room_to_listeners.values():
+ all_listeners |= x
+ for x in self.user_to_listeners.values():
+ all_listeners |= x
+ for x in self.appservice_to_listeners.values():
+ all_listeners |= x
+
+ return len(all_listeners)
+ metrics.register_callback("listeners", count_listeners)
+
+ metrics.register_callback(
+ "rooms",
+ lambda: count(bool, self.room_to_listeners.values()),
+ )
+ metrics.register_callback(
+ "users",
+ lambda: count(bool, self.user_to_listeners.values()),
+ )
+ metrics.register_callback(
+ "appservices",
+ lambda: count(bool, self.appservice_to_listeners.values()),
+ )
+
@log_function
@defer.inlineCallbacks
def on_new_room_event(self, event, extra_users=[]):
diff --git a/synapse/server.py b/synapse/server.py
index cb8610a1b4..c7772244ba 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -56,6 +56,7 @@ class BaseHomeServer(object):
"""
DEPENDENCIES = [
+ 'config',
'clock',
'http_client',
'db_name',
@@ -79,6 +80,7 @@ class BaseHomeServer(object):
'resource_for_server_key',
'resource_for_media_repository',
'resource_for_app_services',
+ 'resource_for_metrics',
'event_sources',
'ratelimiter',
'keyring',
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 3ea7382760..40f2fc6d76 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -20,6 +20,7 @@ from synapse.events.utils import prune_event
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext, LoggingContext
from synapse.util.lrucache import LruCache
+import synapse.metrics
from twisted.internet import defer
@@ -35,9 +36,22 @@ sql_logger = logging.getLogger("synapse.storage.SQL")
transaction_logger = logging.getLogger("synapse.storage.txn")
+metrics = synapse.metrics.get_metrics_for("synapse.storage")
+
+sql_query_timer = metrics.register_distribution("query_time", labels=["verb"])
+sql_txn_timer = metrics.register_distribution("transaction_time", labels=["desc"])
+sql_getevents_timer = metrics.register_distribution("getEvents_time", labels=["desc"])
+
+caches_by_name = {}
+cache_counter = metrics.register_cache(
+ "cache",
+ lambda: {(name,): len(caches_by_name[name]) for name in caches_by_name.keys()},
+ labels=["name"],
+)
+
+
# TODO(paul):
# * more generic key management
-# * export monitoring stats
# * consider other eviction strategies - LRU?
def cached(max_entries=1000):
""" A method decorator that applies a memoizing cache around the function.
@@ -55,6 +69,9 @@ def cached(max_entries=1000):
"""
def wrap(orig):
cache = OrderedDict()
+ name = orig.__name__
+
+ caches_by_name[name] = cache
def prefill(key, value):
while len(cache) > max_entries:
@@ -65,8 +82,10 @@ def cached(max_entries=1000):
@defer.inlineCallbacks
def wrapped(self, key):
if key in cache:
+ cache_counter.inc_hits(name)
defer.returnValue(cache[key])
+ cache_counter.inc_misses(name)
ret = yield orig(self, key)
prefill(key, ret)
defer.returnValue(ret)
@@ -83,7 +102,8 @@ def cached(max_entries=1000):
class LoggingTransaction(object):
"""An object that almost-transparently proxies for the 'txn' object
- passed to the constructor. Adds logging to the .execute() method."""
+ passed to the constructor. Adds logging and metrics to the .execute()
+ method."""
__slots__ = ["txn", "name"]
def __init__(self, txn, name):
@@ -99,6 +119,7 @@ class LoggingTransaction(object):
def execute(self, sql, *args, **kwargs):
# TODO(paul): Maybe use 'info' and 'debug' for values?
sql_logger.debug("[SQL] {%s} %s", self.name, sql)
+
try:
if args and args[0]:
values = args[0]
@@ -120,8 +141,9 @@ class LoggingTransaction(object):
logger.exception("[SQL FAIL] {%s}", self.name)
raise
finally:
- end = time.time() * 1000
- sql_logger.debug("[SQL time] {%s} %f", self.name, end - start)
+ msecs = (time.time() * 1000) - start
+ sql_logger.debug("[SQL time] {%s} %f", self.name, msecs)
+ sql_query_timer.inc_by(msecs, sql.split()[0])
class PerformanceCounters(object):
@@ -172,11 +194,18 @@ class SQLBaseStore(object):
self._previous_txn_total_time = 0
self._current_txn_total_time = 0
self._previous_loop_ts = 0
+
+ # TODO(paul): These can eventually be removed once the metrics code
+ # is running in mainline, and we have some nice monitoring frontends
+ # to watch it
self._txn_perf_counters = PerformanceCounters()
self._get_event_counters = PerformanceCounters()
self._get_event_cache = LruCache(hs.config.event_cache_size)
+ # Pretend the getEventCache is just another named cache
+ caches_by_name["*getEvent*"] = self._get_event_cache
+
def start_profiling(self):
self._previous_loop_ts = self._clock.time_msec()
@@ -231,13 +260,13 @@ class SQLBaseStore(object):
raise
finally:
end = time.time() * 1000
- transaction_logger.debug(
- "[TXN END] {%s} %f",
- name, end - start
- )
+ duration = end - start
+
+ transaction_logger.debug("[TXN END] {%s} %f", name, duration)
- self._current_txn_total_time += end - start
+ self._current_txn_total_time += duration
self._txn_perf_counters.update(desc, start, end)
+ sql_txn_timer.inc_by(duration, desc)
with PreserveLoggingContext():
result = yield self._db_pool.runInteraction(
@@ -638,14 +667,22 @@ class SQLBaseStore(object):
get_prev_content=False, allow_rejected=False):
start_time = time.time() * 1000
- update_counter = self._get_event_counters.update
+
+ def update_counter(desc, last_time):
+ curr_time = self._get_event_counters.update(desc, last_time)
+ sql_getevents_timer.inc_by(curr_time - last_time, desc)
+ return curr_time
cache = self._get_event_cache.setdefault(event_id, {})
try:
# Separate cache entries for each way to invoke _get_event_txn
- return cache[(check_redacted, get_prev_content, allow_rejected)]
+ ret = cache[(check_redacted, get_prev_content, allow_rejected)]
+
+ cache_counter.inc_hits("*getEvent*")
+ return ret
except KeyError:
+ cache_counter.inc_misses("*getEvent*")
pass
finally:
start_time = update_counter("event_cache", start_time)
@@ -685,7 +722,11 @@ class SQLBaseStore(object):
check_redacted=True, get_prev_content=False):
start_time = time.time() * 1000
- update_counter = self._get_event_counters.update
+
+ def update_counter(desc, last_time):
+ curr_time = self._get_event_counters.update(desc, last_time)
+ sql_getevents_timer.inc_by(curr_time - last_time, desc)
+ return curr_time
d = json.loads(js)
start_time = update_counter("decode_json", start_time)
diff --git a/synapse/util/lrucache.py b/synapse/util/lrucache.py
index f115f50e50..65d5792907 100644
--- a/synapse/util/lrucache.py
+++ b/synapse/util/lrucache.py
@@ -16,7 +16,6 @@
class LruCache(object):
"""Least-recently-used cache."""
- # TODO(mjark) Add hit/miss counters
# TODO(mjark) Add mutex for linked list for thread safety.
def __init__(self, max_size):
cache = {}
|