From df9f72d9e5fe264b86005208e0f096156eb03e4b Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Mon, 21 May 2018 19:47:37 -0500 Subject: replacing portions --- synapse/federation/federation_client.py | 17 +++++------- synapse/federation/federation_server.py | 16 +++++------ synapse/federation/send_queue.py | 8 ++---- synapse/federation/transaction_queue.py | 47 +++++++++++++-------------------- 4 files changed, 35 insertions(+), 53 deletions(-) (limited to 'synapse/federation') diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6163f7c466..2761ffae07 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,20 +32,17 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) -import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination -logger = logging.getLogger(__name__) - +from prometheus_client import Counter -# synapse.federation.federation_client is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +logger = logging.getLogger(__name__) -sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) PDU_RETRY_TIME_MS = 1 * 60 * 1000 @@ -108,7 +105,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc(query_type) + sent_queries_counter.labels(query_type).inc() return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, @@ -127,7 +124,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_device_keys") + sent_queries_counter.labels("client_device_keys").inc() return self.transport_layer.query_client_keys( destination, content, timeout ) @@ -137,7 +134,7 @@ class FederationClient(FederationBase): """Query the device keys for a list of user ids hosted on a remote server. """ - sent_queries_counter.inc("user_devices") + sent_queries_counter.labels("user_devices").inc() return self.transport_layer.query_user_devices( destination, user_id, timeout ) @@ -154,7 +151,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_one_time_keys") + sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys( destination, content, timeout ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 247ddc89d5..8211273006 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,12 +27,13 @@ from synapse.federation.federation_base import ( from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function +from prometheus_client import Counter + from six import iteritems # when processing incoming transactions, we try to handle multiple rooms in @@ -41,14 +42,11 @@ TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) -# synapse.federation.federation_server is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.server") - -received_pdus_counter = metrics.register_counter("received_pdus") +received_pdus_counter = Counter("synapse_federation_server_received_pdus", "") -received_edus_counter = metrics.register_counter("received_edus") +received_edus_counter = Counter("synapse_federation_server_received_edus", "") -received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) +received_queries_counter = Counter("synapse_federation_server_received_queries", "", ["type"]) class FederationServer(FederationBase): @@ -131,7 +129,7 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - received_pdus_counter.inc_by(len(transaction.pdus)) + received_pdus_counter.inc(len(transaction.pdus)) pdus_by_room = {} @@ -292,7 +290,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): - received_queries_counter.inc(query_type) + received_queries_counter.labels(query_type).inc() resp = yield self.registry.on_query(query_type, args) defer.returnValue((200, resp)) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0f0c687b37..e6e1888f3a 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,7 +33,7 @@ from .units import Edu from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure -import synapse.metrics +from synapse.metrics import LaterGauge from blist import sorteddict from collections import namedtuple @@ -45,9 +45,6 @@ from six import itervalues, iteritems logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -77,8 +74,7 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - metrics.register_callback( - queue_name + "_size", + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), "", lambda: len(queue), ) diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ded2b1871a..778924a13c 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,23 +26,18 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics +from synapse.metrics import LaterGauge +from synapse.metrics import ( + sent_edus_counter, sent_transactions_counter, events_processed_counter) + +from prometheus_client import Counter import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = client_metrics.register_distribution( - "sent_pdu_destinations" -) -sent_edus_counter = client_metrics.register_counter("sent_edus") - -sent_transactions_counter = client_metrics.register_counter("sent_transactions") - -events_processed_counter = client_metrics.register_counter("events_processed") +sent_pdus_destination_dist = Counter("synapse_federation_client_sent_pdu_destinations", "") class TransactionQueue(object): @@ -69,8 +64,7 @@ class TransactionQueue(object): # done self.pending_transactions = {} - metrics.register_callback( - "pending_destinations", + LaterGauge("pending_destinations", "", [], lambda: len(self.pending_transactions), ) @@ -94,12 +88,12 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - metrics.register_callback( - "pending_pdus", + LaterGauge( + "pending_pdus", "", [], lambda: sum(map(len, pdus.values())), ) - metrics.register_callback( - "pending_edus", + LaterGauge( + "pending_edus", "", [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) @@ -241,18 +235,15 @@ class TransactionQueue(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_lag.set( - now - ts, "federation_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "federation_sender", - ) + synapse.metrics.event_processing_lag.labels( + "federation_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "federation_sender").set(ts) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) + synapse.metrics.event_processing_positions.labels( + "federation_sender").set(next_token) finally: self._is_processing = False @@ -275,7 +266,7 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc_by(len(destinations)) + sent_pdus_destination_dist.inc(len(destinations)) for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( -- cgit 1.4.1