diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 21 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 19 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 13 | ||||
-rw-r--r-- | synapse/federation/transaction_queue.py | 63 |
4 files changed, 57 insertions, 59 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 6163f7c466..87a92f6ea9 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -32,20 +32,17 @@ from synapse.federation.federation_base import ( FederationBase, event_from_pdu_json, ) -import synapse.metrics from synapse.util import logcontext, unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination -logger = logging.getLogger(__name__) - +from prometheus_client import Counter -# synapse.federation.federation_client is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.client") +logger = logging.getLogger(__name__) -sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"]) +sent_queries_counter = Counter("synapse_federation_client_sent_queries", "", ["type"]) PDU_RETRY_TIME_MS = 1 * 60 * 1000 @@ -108,7 +105,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc(query_type) + sent_queries_counter.labels(query_type).inc() return self.transport_layer.make_query( destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail, @@ -127,7 +124,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_device_keys") + sent_queries_counter.labels("client_device_keys").inc() return self.transport_layer.query_client_keys( destination, content, timeout ) @@ -137,7 +134,7 @@ class FederationClient(FederationBase): """Query the device keys for a list of user ids hosted on a remote server. """ - sent_queries_counter.inc("user_devices") + sent_queries_counter.labels("user_devices").inc() return self.transport_layer.query_user_devices( destination, user_id, timeout ) @@ -154,7 +151,7 @@ class FederationClient(FederationBase): a Deferred which will eventually yield a JSON object from the response """ - sent_queries_counter.inc("client_one_time_keys") + sent_queries_counter.labels("client_one_time_keys").inc() return self.transport_layer.claim_client_keys( destination, content, timeout ) @@ -394,7 +391,7 @@ class FederationClient(FederationBase): """ if return_local: seen_events = yield self.store.get_events(event_ids, allow_rejected=True) - signed_events = seen_events.values() + signed_events = list(seen_events.values()) else: seen_events = yield self.store.have_seen_events(event_ids) signed_events = [] @@ -592,7 +589,7 @@ class FederationClient(FederationBase): } valid_pdus = yield self._check_sigs_and_hash_and_fetch( - destination, pdus.values(), + destination, list(pdus.values()), outlier=True, ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 247ddc89d5..2d420a58a2 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -27,12 +27,13 @@ from synapse.federation.federation_base import ( from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction -import synapse.metrics from synapse.types import get_domain_from_id from synapse.util import async from synapse.util.caches.response_cache import ResponseCache from synapse.util.logutils import log_function +from prometheus_client import Counter + from six import iteritems # when processing incoming transactions, we try to handle multiple rooms in @@ -41,17 +42,17 @@ TRANSACTION_CONCURRENCY_LIMIT = 10 logger = logging.getLogger(__name__) -# synapse.federation.federation_server is a silly name -metrics = synapse.metrics.get_metrics_for("synapse.federation.server") - -received_pdus_counter = metrics.register_counter("received_pdus") +received_pdus_counter = Counter("synapse_federation_server_received_pdus", "") -received_edus_counter = metrics.register_counter("received_edus") +received_edus_counter = Counter("synapse_federation_server_received_edus", "") -received_queries_counter = metrics.register_counter("received_queries", labels=["type"]) +received_queries_counter = Counter( + "synapse_federation_server_received_queries", "", ["type"] +) class FederationServer(FederationBase): + def __init__(self, hs): super(FederationServer, self).__init__(hs) @@ -131,7 +132,7 @@ class FederationServer(FederationBase): logger.debug("[%s] Transaction is new", transaction.transaction_id) - received_pdus_counter.inc_by(len(transaction.pdus)) + received_pdus_counter.inc(len(transaction.pdus)) pdus_by_room = {} @@ -292,7 +293,7 @@ class FederationServer(FederationBase): @defer.inlineCallbacks def on_query_request(self, query_type, args): - received_queries_counter.inc(query_type) + received_queries_counter.labels(query_type).inc() resp = yield self.registry.on_query(query_type, args) defer.returnValue((200, resp)) diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 0f0c687b37..9f1142b5a9 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -33,7 +33,7 @@ from .units import Edu from synapse.storage.presence import UserPresenceState from synapse.util.metrics import Measure -import synapse.metrics +from synapse.metrics import LaterGauge from blist import sorteddict from collections import namedtuple @@ -45,9 +45,6 @@ from six import itervalues, iteritems logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - - class FederationRemoteSendQueue(object): """A drop in replacement for TransactionQueue""" @@ -77,10 +74,8 @@ class FederationRemoteSendQueue(object): # lambda binds to the queue rather than to the name of the queue which # changes. ARGH. def register(name, queue): - metrics.register_callback( - queue_name + "_size", - lambda: len(queue), - ) + LaterGauge("synapse_federation_send_queue_%s_size" % (queue_name,), + "", [], lambda: len(queue)) for queue_name in [ "presence_map", "presence_changed", "keyed_edu", "keyed_edu_changed", @@ -202,7 +197,7 @@ class FederationRemoteSendQueue(object): # We only want to send presence for our own users, so lets always just # filter here just in case. - local_states = filter(lambda s: self.is_mine_id(s.user_id), states) + local_states = list(filter(lambda s: self.is_mine_id(s.user_id), states)) self.presence_map.update({state.user_id: state for state in local_states}) self.presence_changed[pos] = [state.user_id for state in local_states] diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index ded2b1871a..f0aeb5a0d3 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -26,23 +26,25 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from synapse.util.metrics import measure_func from synapse.handlers.presence import format_user_presence_state, get_interested_remotes import synapse.metrics +from synapse.metrics import LaterGauge +from synapse.metrics import ( + sent_edus_counter, + sent_transactions_counter, + events_processed_counter, +) + +from prometheus_client import Counter + +from six import itervalues import logging logger = logging.getLogger(__name__) -metrics = synapse.metrics.get_metrics_for(__name__) - -client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client") -sent_pdus_destination_dist = client_metrics.register_distribution( - "sent_pdu_destinations" +sent_pdus_destination_dist = Counter( + "synapse_federation_transaction_queue_sent_pdu_destinations", "" ) -sent_edus_counter = client_metrics.register_counter("sent_edus") - -sent_transactions_counter = client_metrics.register_counter("sent_transactions") - -events_processed_counter = client_metrics.register_counter("events_processed") class TransactionQueue(object): @@ -69,8 +71,10 @@ class TransactionQueue(object): # done self.pending_transactions = {} - metrics.register_callback( - "pending_destinations", + LaterGauge( + "synapse_federation_transaction_queue_pending_destinations", + "", + [], lambda: len(self.pending_transactions), ) @@ -94,12 +98,16 @@ class TransactionQueue(object): # Map of destination -> (edu_type, key) -> Edu self.pending_edus_keyed_by_dest = edus_keyed = {} - metrics.register_callback( - "pending_pdus", + LaterGauge( + "synapse_federation_transaction_queue_pending_pdus", + "", + [], lambda: sum(map(len, pdus.values())), ) - metrics.register_callback( - "pending_edus", + LaterGauge( + "synapse_federation_transaction_queue_pending_edus", + "", + [], lambda: ( sum(map(len, edus.values())) + sum(map(len, presence.values())) @@ -228,7 +236,7 @@ class TransactionQueue(object): yield logcontext.make_deferred_yieldable(defer.gatherResults( [ logcontext.run_in_background(handle_room_events, evs) - for evs in events_by_room.itervalues() + for evs in itervalues(events_by_room) ], consumeErrors=True )) @@ -241,18 +249,15 @@ class TransactionQueue(object): now = self.clock.time_msec() ts = yield self.store.get_received_ts(events[-1].event_id) - synapse.metrics.event_processing_lag.set( - now - ts, "federation_sender", - ) - synapse.metrics.event_processing_last_ts.set( - ts, "federation_sender", - ) + synapse.metrics.event_processing_lag.labels( + "federation_sender").set(now - ts) + synapse.metrics.event_processing_last_ts.labels( + "federation_sender").set(ts) - events_processed_counter.inc_by(len(events)) + events_processed_counter.inc(len(events)) - synapse.metrics.event_processing_positions.set( - next_token, "federation_sender", - ) + synapse.metrics.event_processing_positions.labels( + "federation_sender").set(next_token) finally: self._is_processing = False @@ -275,7 +280,7 @@ class TransactionQueue(object): if not destinations: return - sent_pdus_destination_dist.inc_by(len(destinations)) + sent_pdus_destination_dist.inc(len(destinations)) for destination in destinations: self.pending_pdus_by_dest.setdefault(destination, []).append( @@ -322,7 +327,7 @@ class TransactionQueue(object): if not states_map: break - yield self._process_presence_inner(states_map.values()) + yield self._process_presence_inner(list(states_map.values())) except Exception: logger.exception("Error sending presence states to servers") finally: |