diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/tls.py | 2 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 4 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 7 | ||||
-rw-r--r-- | synapse/http/server.py | 56 | ||||
-rw-r--r-- | synapse/metrics/metric.py | 81 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 19 | ||||
-rw-r--r-- | synapse/storage/events.py | 9 | ||||
-rw-r--r-- | synapse/util/metrics.py | 53 |
8 files changed, 185 insertions, 46 deletions
diff --git a/synapse/config/tls.py b/synapse/config/tls.py index 4748f71c2f..29eb012ddb 100644 --- a/synapse/config/tls.py +++ b/synapse/config/tls.py @@ -96,7 +96,7 @@ class TlsConfig(Config): # certificates returned by this server match one of the fingerprints. # # Synapse automatically adds the fingerprint of its own certificate - # to the list. So if federation traffic is handle directly by synapse + # to the list. So if federation traffic is handled directly by synapse # then no modification to the list is required. # # If synapse is run behind a load balancer that handles the TLS then it diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index 3e7809b04f..9d39f46583 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -42,6 +42,8 @@ sent_edus_counter = client_metrics.register_counter("sent_edus") sent_transactions_counter = client_metrics.register_counter("sent_transactions") +events_processed_counter = client_metrics.register_counter("events_processed") + class TransactionQueue(object): """This class makes sure we only have one transaction in flight at @@ -205,6 +207,8 @@ class TransactionQueue(object): self._send_pdu(event, destinations) + events_processed_counter.inc_by(len(events)) + yield self.store.update_federation_out_pos( "events", next_token ) diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index feca3e4c10..3dd3fa2a27 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -15,6 +15,7 @@ from twisted.internet import defer +import synapse from synapse.api.constants import EventTypes from synapse.util.metrics import Measure from synapse.util.logcontext import make_deferred_yieldable, preserve_fn @@ -23,6 +24,10 @@ import logging logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +events_processed_counter = metrics.register_counter("events_processed") + def log_failure(failure): logger.error( @@ -103,6 +108,8 @@ class ApplicationServicesHandler(object): service, event ) + events_processed_counter.inc_by(len(events)) + yield self.store.set_appservice_last_pos(upper_bound) finally: self.is_processing = False diff --git a/synapse/http/server.py b/synapse/http/server.py index 6e8f4c9c5f..269b65ca41 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -42,34 +42,62 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -incoming_requests_counter = metrics.register_counter( - "requests", +# total number of responses served, split by method/servlet/tag +response_count = metrics.register_counter( + "response_count", labels=["method", "servlet", "tag"], + alternative_names=( + # the following are all deprecated aliases for the same metric + metrics.name_prefix + x for x in ( + "_requests", + "_response_time:count", + "_response_ru_utime:count", + "_response_ru_stime:count", + "_response_db_txn_count:count", + "_response_db_txn_duration:count", + ) + ) ) + outgoing_responses_counter = metrics.register_counter( "responses", labels=["method", "code"], ) -response_timer = metrics.register_distribution( - "response_time", - labels=["method", "servlet", "tag"] +response_timer = metrics.register_counter( + "response_time_seconds", + labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_time:total", + ), ) -response_ru_utime = metrics.register_distribution( - "response_ru_utime", labels=["method", "servlet", "tag"] +response_ru_utime = metrics.register_counter( + "response_ru_utime_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_ru_utime:total", + ), ) -response_ru_stime = metrics.register_distribution( - "response_ru_stime", labels=["method", "servlet", "tag"] +response_ru_stime = metrics.register_counter( + "response_ru_stime_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_ru_stime:total", + ), ) -response_db_txn_count = metrics.register_distribution( - "response_db_txn_count", labels=["method", "servlet", "tag"] +response_db_txn_count = metrics.register_counter( + "response_db_txn_count", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_db_txn_count:total", + ), ) -response_db_txn_duration = metrics.register_distribution( - "response_db_txn_duration", labels=["method", "servlet", "tag"] +response_db_txn_duration = metrics.register_counter( + "response_db_txn_duration_seconds", labels=["method", "servlet", "tag"], + alternative_names=( + metrics.name_prefix + "_response_db_txn_duration:total", + ), ) @@ -330,7 +358,7 @@ class RequestMetrics(object): ) return - incoming_requests_counter.inc(request.method, self.name, tag) + response_count.inc(request.method, self.name, tag) response_timer.inc_by( clock.time_msec() - self.start, request.method, diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py index 1d054dd557..f480aae614 100644 --- a/synapse/metrics/metric.py +++ b/synapse/metrics/metric.py @@ -17,16 +17,33 @@ from itertools import chain -# TODO(paul): I can't believe Python doesn't have one of these -def map_concat(func, items): - # flatten a list-of-lists - return list(chain.from_iterable(map(func, items))) +def flatten(items): + """Flatten a list of lists + + Args: + items: iterable[iterable[X]] + + Returns: + list[X]: flattened list + """ + return list(chain.from_iterable(items)) class BaseMetric(object): + """Base class for metrics which report a single value per label set + """ - def __init__(self, name, labels=[]): - self.name = name + def __init__(self, name, labels=[], alternative_names=[]): + """ + Args: + name (str): principal name for this metric + labels (list(str)): names of the labels which will be reported + for this metric + alternative_names (iterable(str)): list of alternative names for + this metric. This can be useful to provide a migration path + when renaming metrics. + """ + self._names = [name] + list(alternative_names) self.labels = labels # OK not to clone as we never write it def dimension(self): @@ -36,7 +53,7 @@ class BaseMetric(object): return not len(self.labels) def _render_labelvalue(self, value): - # TODO: some kind of value escape + # TODO: escape backslashes, quotes and newlines return '"%s"' % (value) def _render_key(self, values): @@ -47,6 +64,36 @@ class BaseMetric(object): for k, v in zip(self.labels, values)]) ) + def _render_for_labels(self, label_values, value): + """Render this metric for a single set of labels + + Args: + label_values (list[str]): values for each of the labels + value: value of the metric at with these labels + + Returns: + iterable[str]: rendered metric + """ + rendered_labels = self._render_key(label_values) + return ( + "%s%s %.12g" % (name, rendered_labels, value) + for name in self._names + ) + + def render(self): + """Render this metric + + Each metric is rendered as: + + name{label1="val1",label2="val2"} value + + https://prometheus.io/docs/instrumenting/exposition_formats/#text-format-details + + Returns: + iterable[str]: rendered metrics + """ + raise NotImplementedError() + class CounterMetric(BaseMetric): """The simplest kind of metric; one that stores a monotonically-increasing @@ -62,6 +109,10 @@ class CounterMetric(BaseMetric): def __init__(self, *args, **kwargs): super(CounterMetric, self).__init__(*args, **kwargs) + # dict[list[str]]: value for each set of label values. the keys are the + # label values, in the same order as the labels in self.labels. + # + # (if the metric is a scalar, the (single) key is the empty list). self.counts = {} # Scalar metrics are never empty @@ -84,11 +135,11 @@ class CounterMetric(BaseMetric): def inc(self, *values): self.inc_by(1, *values) - def render_item(self, k): - return ["%s%s %.12g" % (self.name, self._render_key(k), self.counts[k])] - def render(self): - return map_concat(self.render_item, sorted(self.counts.keys())) + return flatten( + self._render_for_labels(k, self.counts[k]) + for k in sorted(self.counts.keys()) + ) class CallbackMetric(BaseMetric): @@ -105,10 +156,12 @@ class CallbackMetric(BaseMetric): value = self.callback() if self.is_scalar(): - return ["%s %.12g" % (self.name, value)] + return list(self._render_for_labels([], value)) - return ["%s%s %.12g" % (self.name, self._render_key(k), value[k]) - for k in sorted(value.keys())] + return flatten( + self._render_for_labels(k, value[k]) + for k in sorted(value.keys()) + ) class DistributionMetric(object): diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index d59503b905..0a9a290af4 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -517,25 +517,28 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_error("Wrong remote") def on_RDATA(self, cmd): + stream_name = cmd.stream_name + inbound_rdata_count.inc(stream_name) + try: - row = STREAMS_MAP[cmd.stream_name].ROW_TYPE(*cmd.row) + row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row) except Exception: logger.exception( "[%s] Failed to parse RDATA: %r %r", - self.id(), cmd.stream_name, cmd.row + self.id(), stream_name, cmd.row ) raise if cmd.token is None: # I.e. this is part of a batch of updates for this stream. Batch # until we get an update for the stream with a non None token - self.pending_batches.setdefault(cmd.stream_name, []).append(row) + self.pending_batches.setdefault(stream_name, []).append(row) else: # Check if this is the last of a batch of updates - rows = self.pending_batches.pop(cmd.stream_name, []) + rows = self.pending_batches.pop(stream_name, []) rows.append(row) - self.handler.on_rdata(cmd.stream_name, cmd.token, rows) + self.handler.on_rdata(stream_name, cmd.token, rows) def on_POSITION(self, cmd): self.handler.on_position(cmd.stream_name, cmd.token) @@ -644,3 +647,9 @@ metrics.register_callback( }, labels=["command", "name", "conn_id"], ) + +# number of updates received for each RDATA stream +inbound_rdata_count = metrics.register_counter( + "inbound_rdata_count", + labels=["stream_name"], +) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index d08f7571d7..ad1d782705 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -146,6 +146,9 @@ class _EventPeristenceQueue(object): try: queue = self._get_drainining_queue(room_id) for item in queue: + # handle_queue_loop runs in the sentinel logcontext, so + # there is no need to preserve_fn when running the + # callbacks on the deferred. try: ret = yield per_item_callback(item) item.deferred.callback(ret) @@ -157,7 +160,11 @@ class _EventPeristenceQueue(object): self._event_persist_queues[room_id] = queue self._currently_persisting_rooms.discard(room_id) - preserve_fn(handle_queue_loop)() + # set handle_queue_loop off on the background. We don't want to + # attribute work done in it to the current request, so we drop the + # logcontext altogether. + with PreserveLoggingContext(): + handle_queue_loop() def _get_drainining_queue(self, room_id): queue = self._event_persist_queues.setdefault(room_id, deque()) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 4ea930d3e8..8d22ff3068 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -27,25 +27,56 @@ logger = logging.getLogger(__name__) metrics = synapse.metrics.get_metrics_for(__name__) -block_timer = metrics.register_distribution( - "block_timer", - labels=["block_name"] +# total number of times we have hit this block +response_count = metrics.register_counter( + "block_count", + labels=["block_name"], + alternative_names=( + # the following are all deprecated aliases for the same metric + metrics.name_prefix + x for x in ( + "_block_timer:count", + "_block_ru_utime:count", + "_block_ru_stime:count", + "_block_db_txn_count:count", + "_block_db_txn_duration:count", + ) + ) +) + +block_timer = metrics.register_counter( + "block_time_seconds", + labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_timer:total", + ), ) -block_ru_utime = metrics.register_distribution( - "block_ru_utime", labels=["block_name"] +block_ru_utime = metrics.register_counter( + "block_ru_utime_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_ru_utime:total", + ), ) -block_ru_stime = metrics.register_distribution( - "block_ru_stime", labels=["block_name"] +block_ru_stime = metrics.register_counter( + "block_ru_stime_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_ru_stime:total", + ), ) -block_db_txn_count = metrics.register_distribution( - "block_db_txn_count", labels=["block_name"] +block_db_txn_count = metrics.register_counter( + "block_db_txn_count", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_db_txn_count:total", + ), ) -block_db_txn_duration = metrics.register_distribution( - "block_db_txn_duration", labels=["block_name"] +block_db_txn_duration = metrics.register_counter( + "block_db_txn_duration_seconds", labels=["block_name"], + alternative_names=( + metrics.name_prefix + "_block_db_txn_count:total", + ), ) |